Я использую Spark Streaming для обработки данных между двумя очередями Kafka, но я не могу найти хороший способ писать на Kafka из Spark. Я попробовал это:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
и он работает как задумано, но создание нового KafkaProducer для каждого сообщения явно невозможно в реальном контексте, и я пытаюсь обойти это.
Я хотел бы сохранить ссылку на один экземпляр для каждого процесса и обращаться к нему, когда мне нужно отправить сообщение. Как я могу написать Кафке из Spark Streaming?