Подтвердить что ты не робот

Регистрация Apache Spark в Scala

Я ищу решение, позволяющее записывать дополнительные данные при выполнении кода в Apache Spark Nodes, которые могут помочь в дальнейшем выявить некоторые проблемы, которые могут возникнуть во время выполнения. Пытаться использовать традиционное решение, например, com.typesafe.scalalogging.LazyLogging, не удается, потому что экземпляр журнала не может быть сериализован в распределенной среде, такой как Apache Spark.

Я исследовал эту проблему, и на данный момент решение, которое я нашел, это использовать черту org.apache.spark.Logging следующим образом:

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}

Однако похоже, что свойство Logging не является постоянным решением для Apache Spark, поскольку оно помечено как @DeveloperApi, и в документации по классу упоминается:

Вероятно, это будет изменено или удалено в будущих выпусках.

Мне интересно - это любое известное решение для ведения журнала, которое я могу использовать, и позволяю мне регистрировать данные, когда RDD выполняются на узлах Apache Spark?

@Later Edit. Некоторые из комментариев ниже предлагают использовать Log4J. Я попытался использовать Log4J, но у меня все еще возникают проблемы при использовании регистратора из класса Scala (а не объекта Scala). Вот мой полный код:

import org.apache.log4j.Logger
import org.apache.spark._

object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}

class LoggingTestWithRDD extends Serializable {

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }

}

Исключением, которое я вижу, является:

Исключение в потоке "main" org.apache.spark.SparkException: Задача не сериализуема → Причиняется: java.io.NotSerializableException: org.apache.log4j.Logger

4b9b3361

Ответ 1

Вы можете использовать решение Akhil, предложенное в
https://www.mail-archive.com/[email protected]/msg29010.html. Я использовал сам, и он работает.

Ахиль Дас Мон, 25 мая 2015 года 08:20:40 -0700
Попробуйте следующим образом:

object Holder extends Serializable {      
   @transient lazy val log = Logger.getLogger(getClass.getName)    
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}

Ответ 2

val log = Logger.getLogger(getClass.getName),

Вы можете использовать "журнал" для записи журналов. Кроме того, если вам нужны свойства журнала изменений, вам нужно иметь log4j.properties в папке /conf. По умолчанию у нас будет шаблон в этом месте.

Ответ 3

Использовать Log4j 2.x. Основной регистратор был сериализуемым. Проблема решена.

Обсуждение Jira: https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x"

"org.apache.logging.log4j" % "log4j-core" % "2.x.x"

"org.apache.logging.log4j" %% "log4j-api- scala" % "2.x.x"

Ответ 4

Вот мое решение:

Я использую SLF4j (с привязкой Log4j), в моем базовом классе каждой искровой работы у меня есть что-то вроде этого:

import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 

Как раз перед тем местом, где я использую LOG в распределенном функциональном коде, я скопирую ссылку на локатор на локальную константу.

val LOG = this.LOG

Это сработало для меня!