Подтвердить что ты не робот

Kafka: Получить хостинг-провайдер от ZooKeeper

По каким-то причинам мне нужно использовать как - ConsumerGroup (потребитель высокого уровня a.k.a.), так и SimpleConsumer (потребитель низкого уровня a.k.a.) для чтения из Kafka. Для ConsumerGroup я использую конфигурацию на основе ZooKeeper, и я полностью ее доволен, но SimpleConsumer требует, чтобы посредники-посредники были созданы.

Я не хочу вести список обоих - ZooKeeper и хостов-брокеров. Таким образом, я ищу способ автоматически обнаружить брокеров для определенной темы из ZooKeeper.

Из-за некоторой косвенной информации я полагаю, что эти данные хранятся в ZooKeeper по одному из следующих путей:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /брокеры/иды/

Однако, когда я пытаюсь прочитать данные из этих узлов, я получаю ошибку сериализации (я использую для этого com.101tec.zkclient):

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: недопустимый заголовок потока: 7B226A6D   at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)   на org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)   на org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)   ... 64 удаленных Вызывается: java.io.StreamCorruptedException: недопустимый заголовок потока: 7B226A6D   в java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)   в java.io.ObjectInputStream. (ObjectInputStream.java:299)   at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream. (TcclAwareObjectIputStream.java:30)   at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)   ... 69 подробнее

Я могу писать и читать пользовательские объекты Java (например, строки) без каких-либо проблем, поэтому я считаю, что это не проблема клиента, а довольно сложная кодировка. Таким образом, я хочу знать:

  • Если это правильный путь, как правильно прочитать эти узлы?
  • Если весь подход неверен, , что является правильным?
4b9b3361

Ответ 1

Именно так я сделал один из моих коллег, чтобы получить список брокеров Kafka. Я считаю, что это правильный способ, когда вы хотите динамически получать список брокеров.

Вот пример кода, который показывает, как получить список.

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

Запуск кода в кластер, состоящий из трех брокеров, приводит к

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}

Ответ 2

Оказывается, что Kafka использует ZKStringSerializer для чтения и записи данных в znodes. Итак, чтобы исправить ошибку, мне пришлось добавить ее в качестве последнего параметра в конструктор ZkClient:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

Используя его, я написал несколько полезных функций для обнаружения идентификаторов брокеров, их адресов и других вещей:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException


def listBrokers(): List[Int] = {
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}

def listTopics(): List[String] = {
  zkClient.getChildren("/brokers/topics").toList
}

def listPartitions(topic: String): List[Int] = {
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) {
    zkClient.getChildren(path).toList.map(_.toInt)
  } else {
    throw new KafkaException(s"Topic ${topic} doesn't exist")
  }
}

def getBrokerAddress(brokerId: Int): (String, Int) = {
  val path = s"/brokers/ids/${brokerId}"
  if (zkClient.exists(path)) {
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
  } else {
    throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
  }
}

def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
  val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
  if (zkClient.exists(path)) {
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
  } else {
    throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
  }
}

Ответ 3

Для этого используйте оболочку:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0 

Ответ 4

на самом деле есть ZkUtils изнутри Kafka (по крайней мере для линии 0.8.x), которую вы можете использовать с одним небольшим предостережением: вам нужно будет повторно реализовать ZkStringSerializer, который преобразует строки как кодированные в UTF-8 байтовые массивы. Если вы хотите использовать Java8 потоковые API, вы можете перебирать Scala коллекции throug scala.collection.JavaConversions. Это то, что помогло моему делу.

Ответ 5

 public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
        KeeperException, InterruptedException {

    this.zookeeperAddress = zookeeperAddress;
    this.topic = topic;

    ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
    List<String> brokerList = new ArrayList<String>();

    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

    props.put("serializer.class", KAFKA_STRING_ENCODER);
    props.put("metadata.broker.list", String.join(",", brokerList));
    producer = new Producer<String, String>(new ProducerConfig(props));
}