Подтвердить что ты не робот

UnresolvedException: Недействительный вызов метода dataType для неразрешенного объекта при использовании DataSet, построенного из Seq.empty (начиная с Spark 2.3.0)

Следующий снипп отлично работает в Spark 2.2.1, но дает довольно загадочное исключение во время выполнения Spark 2.3.0:

import sparkSession.implicits._
import org.apache.spark.sql.functions._

case class X(xid: Long, yid: Int)
case class Y(yid: Int, zid: Long)
case class Z(zid: Long, b: Boolean)

val xs = Seq(X(1L, 10)).toDS()
val ys = Seq(Y(10, 100L)).toDS()
val zs = Seq.empty[Z].toDS()

val j = xs
  .join(ys, "yid")
  .join(zs, Seq("zid"), "left")
  .withColumn("BAM", when('b, "B").otherwise("NB"))

j.show()

В Spark 2.2.1 он печатает на консоли

+---+---+---+----+---+
|zid|yid|xid|   b|BAM|
+---+---+---+----+---+
|100| 10|  1|null| NB|
+---+---+---+----+---+

В Spark 2.3.0 это приводит к:

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'BAM
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
  at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
  at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
  ...

Кажется, что виновником является Dataset, созданный из пустого Seq[Z]. Когда вы меняете это на то, что также приведет к пусту Dataset[Z] оно работает как в Spark 2.2.1, например

val zs = Seq(Z(10L, true)).toDS().filter('zid === 999L)

В руководстве по миграции от 2.2 до 2.3 указано:

Начиная с Spark 2.3, детерминированные предикаты Join/Filters, которые после первых недетерминированных предикатов также, если это возможно, также сдвигаются вниз/через дочерние операторы. В предыдущих версиях Spark эти фильтры не имеют права на преданное нажатие.

Связана ли эта ошибка или (известная) ошибка?

4b9b3361

Ответ 1

@user9613318 есть ошибка, созданная OP, но закрывается как "Невозможно воспроизвести", потому что разработчик говорит, что

Я не могу воспроизвести текущий мастер. Это должно быть исправлено.

но нет ссылки на другой основной вопрос, чтобы он оставался загадкой.

Ответ 2

Я работал над этим на 2.3.0, someEmptyDataset.cache() сразу после создания пустого Dataset. Пример OP больше не zs.cache() неудачу, как это (с zs.cache()), и реальная проблема на работе ушла с этим трюком.

(Как примечание, код OP-s не выходит из строя на Spark 2.3.2, который запускается локально. Хотя я не вижу связанного исправления в журнале изменений 2.3.2, возможно, это связано с некоторыми другими различиями в среда...)