Подтвердить что ты не робот

Исключение Apache Spark EOF

Я получаю EOFException при запуске простого задания, которое читает текстовый файл и собирает результаты. Это отлично работает на моей машине dev, но не удается выполнить его в автономном режиме (одиночная машина, мастер + рабочий). Моя настройка - Apache Spark 0.9.1 Hadoop 2, предварительно построенный.

Я развертываю свой код с помощью плагина sbt-assembly и создаю исполняемый файл jar.

Связанная трассировка стека:

14/05/27 08:22:03 WARN scheduler.TaskSetManager: Loss was due to java.io.EOFException
java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1014)
    at org.apache.hadoop.io.WritableUtils.readCompressedByteArray(WritableUtils.java:39)
    at org.apache.hadoop.io.WritableUtils.readCompressedString(WritableUtils.java:87)
    at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:185)
    at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2378)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
    at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
    at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
    at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:701)

[EDIT]

Обратите внимание, что я сменил сериализатор, и теперь я использую Kryo (просто попробовал его, чтобы узнать, была ли это проблема или нет).

Мой искровой контекст:

//Load Spark config file
lazy val conf = ConfigFactory.load

//Set Spark config object
val sparkConf = new SparkConf()
      .setMaster(conf.getString("spark.prod.master"))  //Something like spark://host:port
      .setAppName(conf.getString("app.name"))
      .set("spark.executor.memory", conf.getString("spark.prod.config.executorMemory"))
      .set("spark.cores.max", conf.getString("spark.prod.config.coresMax"))
      .set("spark.serializer", conf.getString("spark.prod.config.serializer"))
      .set("spark.kryo.registrator", conf.getString("spark.prod.config.kryoRegistrator"))
      .set("spark.kryoserializer.buffer.mb", conf.getString("spark.prod.config.kryoSerializerBufferSize"))
      .set("spark.logConf", conf.getString("spark.prod.config.logConf"))

Любые подсказки?

4b9b3361

Ответ 1

После нескольких дней борьбы с этим я наконец придумал решение. Я должен был добавить соответствующую зависимость hadoop-client, чтобы избежать этого странного исключения.

После этого появилась еще одна сообщенная ошибка. Решение проблемы отказа отказалось:

  • Измените sbin/start-master.sh и/или sbin/start-slaves.sh и установите $SPARK_MASTER_IP на hostname -f вместо hostname. Кажется, Akka работает только с полными именами, а не с именами хостов или IP-адресами.
  • Установите также $SPARK_MASTER_IP в conf/spark-env.sh на hostname -f, чтобы работники кластера могли добраться до мастера.
  • Убедитесь, что conf/slaves также использует полные имена доменов, а не имена хостов/IP-адресов.

После этих изменений все просто сработало.

Надеюсь, что это поможет кому-то еще!