Подтвердить что ты не робот

Класс секвенсора KafKa, назначить сообщение для раздела внутри темы с помощью клавиши

Я новичок в kafka, так извиняюсь, если я говорю глупо, но то, что я понял до сих пор is.. Поток сообщения может быть определен как тема, например категория. И каждая тема делится в один или несколько разделов (каждый раздел может иметь несколько реплик). поэтому они действуют параллельно

На главном сайте Kafka говорится:

Производитель может выбрать, какое сообщение назначить для какого раздела в теме. Это можно сделать круговым способом просто для балансировки нагрузки, или это можно сделать в соответствии с некоторой функцией семантического разбиения (например, на основе некоторого ключа в сообщении).

Означает ли это, что во время потребления я смогу выбрать смещение сообщения из определенного раздела? При запуске нескольких разделов можно выбрать один конкретный раздел i.e partition 0?

В Kafka 0.7 быстрый старт говорят

Отправьте сообщение с помощью ключа раздела. Сообщения с одним и тем же ключом отправляются на один и тот же раздел.

И ключ может быть предоставлен при создании производителя ниже

    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
    producer.send(data);

Теперь, как я могу использовать сообщение на основе этого ключа? каково фактическое влияние использования этого ключа при производстве в Кафке?

При создании производителя в 0.8beta мы можем предоставить атрибут класса разделителя через файл конфигурации. Пользовательский класс разделителей может быть, возможно, создан, реализуя интерфейс kafka-разделитель. Но мало что смущает, как именно это работает. 0.8 док также не объясняет многое. Любой совет или я что-то не хватает?

4b9b3361

Ответ 1

Это то, что я нашел до сих пор..

Определите наш собственный класс разделителей, выполнив интерфейс kafka Partitioner. Внедренный метод будет иметь два аргумента, сначала ключ, который мы предоставляем от производителя, и следующее количество доступных разделов. Таким образом, мы можем определить свою собственную логику, чтобы установить, какой ключ сообщения переходит в какой раздел.

Теперь при создании продюсера мы можем указать наш собственный класс разделителей, используя атрибут "partitioner.class"

    props.put("partitioner.class", "path.to.custom.partitioner.class");

Если мы не упомянем об этом, Kafka будет использовать свой класс по умолчанию и попытаться распределить сообщение равномерно среди доступных разделов.

Также сообщите Кафке, как сериализовать ключ

    props.put("key.serializer.class", "kafka.serializer.StringEncoder");

Теперь, если мы отправим какое-либо сообщение с использованием ключа в продюсере, сообщение будет доставлено на определенный раздел (на основе нашей логики, написанной на пользовательском классе разделителя), и на уровне потребителя (SimpleConsumer) мы можем указать раздел для получения определенных сообщений.

В случае, если нам нужно передать String в качестве ключа, то же самое нужно обработать в пользовательском классе разделителя (взять хэш-значение ключа, а затем взять первые две цифры и т.д.)

Ответ 2

Каждая тема в Кафке разделяется на многие разделы. Раздел позволяет увеличивать пропускную способность при параллельном потреблении.

Продюсер публикует сообщение в теме с помощью клиентской библиотеки производителя Kafka, которая балансирует сообщения через доступные разделы с помощью Partitioner. Брокер, к которому подключается производитель, заботится о передаче сообщения брокеру, который является лидером этого раздела, используя информацию владельца раздела в zookeeper. Потребители используют Kafkas Высокоуровневую потребительскую библиотеку (которая управляет изменениями брокера, управляет информацией о смещении в zookeeper и неявно понимает информацию о владельце раздела и т.д.), Чтобы потреблять сообщения из разделов в потоках; каждый поток может быть отображен на несколько разделов в зависимости от того, как потребитель выбирает создание потоков сообщений.

Например, если 10 разделов для темы и 3 экземпляра клиента (C1, C2, C3 запущены в этом порядке), принадлежащие к одной и той же группе потребителей, мы можем иметь разные модели потребления, которые позволяют читать parallelism как ниже

Каждый потребитель использует один поток.  В этой модели, когда C1 запускает все 10 разделов темы, отображаются в один и тот же поток, и C1 начинает потреблять из этого потока. Когда начинается C2, Kafka перебалансирует разделы между двумя потоками. Таким образом, каждый поток будет назначен на 5 разделов (в зависимости от алгоритма ребалансировки он может также быть 4 против 6), и каждый потребитель потребляет его поток. Аналогично, когда начинается C3, разделы снова балансируются между тремя потоками. Обратите внимание, что в этой модели при потреблении из потока, назначенного более чем одному разделу, порядок сообщений будет перемешаться между разделами.

Каждый потребитель использует несколько потоков (например, C1 использует 3, C2 использует 3, а C3 использует 4). В этой модели, когда C1 запускается, все 10 разделов назначаются 3 потокам, а C1 может потреблять из трех потоков одновременно с использованием нескольких потоков. Когда начинается C2, разделы перебалансируются между 6 потоками, и аналогично, когда начинается C3, разделы перебалансируются между 10 потоками. Каждый потребитель может потреблять одновременно из нескольких потоков. Обратите внимание, что количество потоков и разделов здесь равно. В случае, если количество потоков превышает разделы, некоторые потоки не получат никаких сообщений, поскольку им не будут назначены какие-либо разделы.

Ответ 3

Означает ли это, что во время потребления я смогу выбрать смещение сообщения из определенного раздела? При запуске нескольких разделов можно выбрать один конкретный раздел i.e partition 0?

Да, вы можете выбрать сообщение от одного конкретного раздела от своего потребителя, но если вы хотите, чтобы это было идентифицировано динамически, это зависит от логики того, как вы реализовали класс Partitioner в своем продюсере.

Теперь, как я могу использовать сообщение на основе этого ключа? каково фактическое влияние использования этого ключа при производстве в Кафке?

Есть два способа потребления сообщения. Один использует Zookeeper Host, а другой - статический хост. Хозяин Zookeper потребляет сообщение из всех разделов. Однако, если вы являетесь статическим хостом uisng, вы можете предоставить брокеру номер раздела, который необходимо использовать.

Пожалуйста, проверьте ниже пример Kafka 0.8

Producer

KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);

Класс раздела

   public int partition(Object arg0, int arg1) {
        // arg0 is the key given while producing, arg1 is the number of
        // partition the broker has
        long organizationId = Long.parseLong((String) arg0);
        // if the given key is less than the no of partition available then send
        // it according to the key given Else send it to the last partition
        if (arg1 < organizationId) {

            return (arg1 - 1);
        }
        // return (int) (organizationId % arg1);
        return Integer.parseInt((String) arg0);
    }

Итак, класс partiotioner решает, куда отправить сообщение на основе вашей логики.

Потребитель (PN: я использовал интеграцию Storm Kafka 0.8)

        HostPort hosts = new HostPort("10.**.**.***",9092);

        GlobalPartitionInformation gpi = new GlobalPartitionInformation();
        gpi.addPartition(0, hosts);
        gpi.addPartition(2, hosts);

        StaticHosts statHost = new StaticHosts(gpi);

        SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);