Подтвердить что ты не робот

Почему "невозможно найти кодировщик для типа, хранящегося в наборе данных" при создании набора данных пользовательского класса case?

Spark 2.0 (окончательный) с Scala 2.11.8. Следующий суперпростой код дает ошибку компиляции Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
4b9b3361

Ответ 1

Spark Datasets требует Encoders для типа данных, который должен быть сохранен. Для обычных типов (атоматика, типы продуктов) имеется ряд предопределенных кодеров, но вы должны импортировать их из SparkSession.implicits, чтобы сделать это работа:

val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)

Дальнейшее чтение:

Ответ 2

Для других пользователей (ваш правильный), обратите внимание, что вам также важно, чтобы case class был определен вне области object. Итак:

Не удается:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    val dataset = sparkSession.createDataset(dataList)
  }
}

Добавьте implicits, все еще не с той же ошибкой:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

Работает:

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {   
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

Здесь соответствующая ошибка: https://issues.apache.org/jira/browse/SPARK-13540, поэтому, надеюсь, она будет исправлена ​​в следующей версии Spark 2.

(Edit: похоже, что исправление действительно находится в Spark 2.0.0... Поэтому я не уверен, почему это все еще не удается).

Ответ 3

Я бы уточнил с ответом на мой собственный вопрос, что если цель состоит в том, чтобы определить простой литерал SparkData, а не использовать кортежи Scala и неявное преобразование, более простой путь - использовать Spark API напрямую, как это:

  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("a", StringType) ::
    StructField("b", IntegerType) ::
    StructField("c", IntegerType) ::
    StructField("d", IntegerType) ::
    StructField("e", IntegerType) :: Nil)

  val data = List(
    Row("001", 1, 0, 3, 4),
    Row("001", 3, 4, 1, 7),
    Row("001", null, 0, 6, 4),
    Row("003", 1, 4, 5, 7),
    Row("003", 5, 4, null, 2),
    Row("003", 4, null, 9, 2),
    Row("003", 2, 3, 0, 1)
  )

  val df = spark.createDataFrame(data.asJava, simpleSchema)