Подтвердить что ты не робот

Spark Streaming - читайте и пишите в теме Кафки

Я использую Spark Streaming для обработки данных между двумя очередями Kafka, но я не могу найти хороший способ писать на Kafka из Spark. Я попробовал это:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

и он работает как задумано, но создание нового KafkaProducer для каждого сообщения явно невозможно в реальном контексте, и я пытаюсь обойти это.

Я хотел бы сохранить ссылку на один экземпляр для каждого процесса и обращаться к нему, когда мне нужно отправить сообщение. Как я могу написать Кафке из Spark Streaming?

4b9b3361

Ответ 1

Моим первым советом было бы попытаться создать новый экземпляр в foreachPartition и измерить, если это достаточно быстро для ваших нужд (создание экземпляров тяжелых объектов в foreachPartition - это то, что предлагает официальная документация).

Другой вариант - использовать пул объектов, как показано в этом примере:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

Однако мне было сложно реализовать при использовании контрольной точки.

Другая версия, которая хорошо работает для меня, - это factory, как описано в следующем сообщении в блоге, вам просто нужно проверить, достаточно ли она для вас предоставить parallelism (см. раздел комментариев):

http://allegro.tech/2015/08/spark-kafka-integration.html

Ответ 2

Да, к сожалению, Spark (1.x, 2.x) не делает прямолинейным, как эффективно писать в Kafka.

Я бы предложил следующий подход:

  • Использовать (и повторно использовать) один экземпляр KafkaProducer для каждого процесса-исполнителя/JVM.

Здесь высокоуровневая настройка для этого подхода:

  • Во-первых, вы должны "обернуть" Kafka KafkaProducer, потому что, как вы упомянули, это не сериализуемо. Обертка позволяет вам "отправить" его исполнителям. Основная идея здесь заключается в использовании lazy val, чтобы вы задерживали создание экземпляра продюсера до его первого использования, что является эффективным способом обхода, поэтому вам не нужно беспокоиться о том, что KafkaProducer не может быть сериализуемым.
  • Вы "отправляете" завернутый производитель каждому исполнителю с использованием широковещательной переменной.
  • В вашей логике обработки вы получаете доступ к завернутому производителю через переменную широковещания и используете его для записи результатов обработки обратно в Kafka.

Ниже приведены фрагменты кода, работающие с Spark Streaming с Spark 2.0.

Шаг 1: Обертка KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Шаг 2. Используйте переменную широковещательной передачи, чтобы предоставить каждому исполнителю свой собственный завернутый KafkaProducer экземпляр

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Шаг 3: Запись из Spark Streaming в Kafka, повторное использование того же завернутого экземпляра KafkaProducer (для каждого исполнителя)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

Надеюсь, что это поможет.

Ответ 3

Существует Streaming Kafka Writer, поддерживаемый Cloudera (фактически выделенный из Spark JIRA [1]). Он в основном создает производителя на раздел, который амортизирует время, затрачиваемое на создание "тяжелых" объектов над коллекцией элементов (надеюсь, большой).

Писатель можно найти здесь: https://github.com/cloudera/spark-kafka-writer

Ответ 4

У меня была такая же проблема, и я нашел этот пост.

Автор решает проблему, создавая 1 продюсера на исполнителя. Вместо того, чтобы отправлять самого продюсера, он посылает только "рецепт", как создать продюсера в исполнителе путем его трансляции.

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

Он использует обертку, которая лениво создает производителя:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

Оболочка является сериализуемой, потому что производитель Kafka инициализируется непосредственно перед первым использованием на исполнителе. Драйвер сохраняет ссылку на оболочку, а оболочка отправляет сообщения с использованием каждого исполнителя-исполнителя:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }

Ответ 5

Почему это неосуществимо? По сути, каждый раздел каждого RDD будет запускаться независимо (и может хорошо работать на другом кластере node), поэтому вам нужно повторить соединение (и любую синхронизацию) в начале каждой задачи раздела. Если накладные расходы слишком высоки, вы должны увеличить размер партии в StreamingContext до тех пор, пока она не станет приемлемой (для этого требуется задержка ожидания).

(Если вы не обрабатываете тысячи сообщений в каждом разделе, уверены ли вы, что вам нужно искрообразование вообще? Не могли бы вы улучшить работу с автономным приложением?)

Ответ 6

С искрой> = 2.2

Операции чтения и записи возможны на Kafka с использованием API структурированной потоковой передачи.

Построить стрим из темы Кафка

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use 'read' for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Прочитайте ключ и значение и примените схему к обоим, для простоты мы делаем преобразование их обоих в тип String.

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Поскольку dsStruc имеет схему, она принимает все операции типа SQL, такие как filter, agg, select..etc.

Написать стрим в тему Кафки

dsStruc
  .writeStream // use 'write' for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

Дополнительная конфигурация для интеграции Kafka для чтения или записи

Ключевые артефакты для добавления в приложение

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,

Ответ 7

Это может быть то, что вы хотите сделать. Вы в основном создаете одного производителя для каждого раздела записей.

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

Надеюсь, что поможет