Kafka Consumer: запись не найдена - программирование

Kafka Consumer: запись не найдена

Я пытаюсь проверить потребителя kafka, используя данные из удаленного кластера Kafka. Я получаю следующую ошибку при использовании kafka-console-consumer.sh:

 ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Вот команда, которую я использую:

./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP

Вот файл ./config/consumer.properties:

bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}

# consumer group id
group.id=MYGROUP

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest

#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234

Ты хоть представляешь, в чем проблема?

4b9b3361

Ответ 1

Я нашел проблему. Это была проблема с DNS в конце. Я связывался с брокерами Kafka по IP-адресам, но брокер отвечает DNS-именем. После настройки DNS-имен на стороне потребителя он снова начал работать.

Ответ 2

У меня была эта проблема (с потребителями и производителями) при запуске Kafka и Zookeeper в качестве контейнеров Docker.

Решение состояло в том, чтобы установить advertised.listeners в файле config/server.properties брокеров Kafka, чтобы он содержал IP-адрес контейнера, например

advertised.listeners=PLAINTEXT://172.15.0.8:9092

См. Https://github.com/maxant/kafkaplayground/blob/master/start-kafka.sh для примера сценария, используемого для запуска Kafka внутри контейнера после правильной настройки файла свойств.

Ответ 3

Кажется, что свойство слушателя кластера Kafka не настроено в server.properties.

В удаленном кластере kafka это свойство должно быть раскомментировано с правильным именем хоста.

listeners=PLAINTEXT://0.0.0.0:9092

Ответ 4

Вы уверены, что удаленная кафка запущена? Я бы предложил запустить nmap -p PORT HOST, чтобы убедиться, что порт открыт (если он не настроен иначе, порт должен быть 9092). Если это нормально, тогда вы можете использовать kafkacat, который делает вещи проще. Создание потребителя, запускающего kafkacat -b HOST:PORT -t YOUR_TOPIC -C -o beginning или создание производителя, запускающего kafkacat -b HOST:PORT -t YOUR_TOPIC -p

Ответ 5

Можете ли вы указать, как это исправить. У меня та же проблема

Ответ 6

Вы можете увидеть сообщение, что запись не найдена, если версия RestClient отличается от вашей версии ElasticSearch

Ответ 7

В моем случае я получал, что при попытке подключиться к контейнеру Kafka мне пришлось передать следующее:

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Надеюсь, это поможет кому-то

Ответ 8

У меня та же проблема, я тоже сказал @yeralin, но я получаю это сообщение. Мой docker-compose.yml такой:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://localhost:9092,PLAINTEXT://broker:29092

  connect:
    image: confluentinc/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.1.1 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run$

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.2.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksql-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksql-server:
    image: confluentinc/cp-ksql-server:5.2.1
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.2.1
    container_name: ksql-cli
    depends_on:
      - broker
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    # Downrev ksql-examples to 5.1.2 due to DEVX-798 (work around issues in 5.2.0)
    image: confluentinc/ksql-examples:5.1.2
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.2.1
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'

Я следил за страницей @rmoff, но ничего. Я пытаюсь прослушивать события моей базы данных MySQL на AWS EC2, также у меня есть экземпляр с Ubuntu, где я запускаю Kafka в Docker Container.

При этом состояние моего разъема уже работает и его задача тоже. Но я пока не вижу данных из своей таблицы foobar.

Ответ 9

В моем случае не удалось найти идентификатор брокера (2147475658), упомянутый по ошибке.

No entry found for connection 2147475658

Вы можете создать брокера с идентификатором 2147475658, установив свойство broker.id в файле server.properties. Создайте отдельные файлы server.properties для всех брокеров.

Или, если у вас есть хотя бы один живой брокер, вы можете удалить/удалить брокера, который выдает ошибку.

Ссылка на документацию: https://kafka.apache.org/documentation/#quickstart_multibroker