Подтвердить что ты не робот

AbstractMethodError, создающий поток Kafka

Я пытаюсь открыть поток Kafka (проверенные версии 0.11.0.2 и 1.0.1) с createDirectStream метода createDirectStream и получить эту ошибку AbstractMethodError:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)

Вот как я это называю:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

У меня есть Kafka, работающий на 9092, и я могу создавать производителей и потребителей и передавать сообщения между ними, поэтому не уверен, почему он не работает с кодом Scala. Любые идеи оценили.

4b9b3361

Ответ 1

Оказывается, я использовал Spark 2.3, и я должен был использовать Spark 2.2. По-видимому, этот метод был сделан абстрактным в более поздней версии, поэтому я получил эту ошибку.

Ответ 2

У меня было то же исключение, в моем случае я создал приложение jar с зависимостью от spark-streaming-kafka-0-10_2.11 версии 2.1.0, пытаясь развернуть кластер Spark 2.3.0.

Ответ 3

Я получил ту же ошибку. Я установил в моих зависимостях ту же версию, что и мой искровой интерпретатор

%spark2.dep
z.reset()
z.addRepo("MavenCentral").url("https://mvnrepository.com/")

z.load("org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0")
z.load("org.apache.kafka:kafka-clients:2.3.0")