sessionIdList
относится к типу:
scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
Когда я пытаюсь запустить приведенный ниже код:
val x = sc.parallelize(List(1,2,3))
val cartesianComp = x.cartesian(x).map(x => (x))
val kDistanceNeighbourhood = sessionIdList.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Я получаю исключение:
14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Однако, если я использую:
val l = sc.parallelize(List("1","2"))
val kDistanceNeighbourhood = l.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Тогда исключение не отображается
Разница между двумя фрагментами кода заключается в том, что в первом фрагменте sessionIdList имеет тип:
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
и во втором фрагменте "l" имеет тип
scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12
Почему происходит эта ошибка?
Нужно ли конвертировать sessionIdList в ParallelCollectionRDD, чтобы это исправить?