Подтвердить что ты не робот

NullPointerException в Scala Искры, похоже, вызвано типом коллекции?

sessionIdList относится к типу:

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

Когда я пытаюсь запустить приведенный ниже код:

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))

val kDistanceNeighbourhood = sessionIdList.map(s => {
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

Я получаю исключение:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

Однако, если я использую:

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => {    
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

Тогда исключение не отображается

Разница между двумя фрагментами кода заключается в том, что в первом фрагменте sessionIdList имеет тип:

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

и во втором фрагменте "l" имеет тип

scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12

Почему происходит эта ошибка?

Нужно ли конвертировать sessionIdList в ParallelCollectionRDD, чтобы это исправить?

4b9b3361

Ответ 1

Spark не поддерживает вложенность RDD (см. fooobar.com/questions/94792/... для другого возникновения той же проблемы), поэтому вы не можете выполнять преобразования или действия на RDD внутри других операций RDD.

В первом случае вы видите исключение NullPointerException, созданное рабочим, когда оно пытается получить доступ к объекту SparkContext, который присутствует только на драйвере, а не на рабочих.

Во втором случае моя догадка заключается в том, что работа выполнялась локально на драйвере и работала просто случайно.

Ответ 2

Его разумный вопрос, и я слышал, что он спросил его достаточно времени. Я собираюсь попытаться объяснить, почему это так, потому что это может помочь.

Вложенные RDD всегда будут генерировать исключение в процессе производства. Вложенные вызовы функций, как я думаю, вы описываете их здесь, если это означает вызов операции RDD внутри операции RDD, вызовет также сбои причин, поскольку на самом деле это одно и то же. (RDD являются неизменяемыми, поэтому выполнение операции RDD, такой как "карта", эквивалентно созданию нового RDD.) Способность создавать вложенные RDD является необходимым следствием того, как определяется RDD и как приложение Spark настроить.

RDD - это распределенная коллекция объектов (называемых разделами), которые живут на Excrors Spark. Исполнители Spark не могут общаться друг с другом, только с помощью драйвера Spark. Операции RDD вычисляются по частям на этих разделах. Поскольку среда-исполнитель RDD не является рекурсивной (т.е. Вы можете настроить драйвер Spark на исполнитель искры с суб-исполнителями), ни RDD не может быть.

В вашей программе вы создали распределенную коллекцию разделов целых чисел. Затем вы выполняете операцию сопоставления. Когда драйвер Spark видит операцию сопоставления, он отправляет инструкции для выполнения сопоставления исполнителям, которые выполняют преобразование на каждом разделе параллельно. Но ваше сопоставление не может быть выполнено, потому что на каждом разделе вы пытаетесь вызвать "весь RDD" для выполнения другой распределенной операции. Этого не может быть сделано, потому что каждый раздел не имеет доступа к информации на других разделах, если это так, вычисление не может выполняться параллельно.

Что вы можете сделать вместо этого, потому что данные, которые вам нужны на карте, вероятно, невелики (поскольку вы делаете фильтр, а фильтр не требует никакой информации о sessionIdList) - это сначала отфильтровать список идентификаторов сеанса. Затем соберите этот список с драйвером. Затем передайте его исполнителям, где вы можете использовать его на карте. Если список sessionID слишком велик, вам, вероятно, потребуется выполнить соединение.