Подтвердить что ты не робот

Ошибка кодирования при попытке сопоставить строку dataframe с обновленной строкой

Когда я пытаюсь сделать то же самое в своем коде, как указано ниже

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

Я взял приведенную выше ссылку здесь: Scala: как заменить значение в Dataframs с помощью scala Но я получаю ошибку кодировщика как

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, S tring и т.д.) И Типы продуктов (классы case) поддерживаются импорт spark.im plicits._ Поддержка сериализации других типов будет быть добавлен в будущих выпусках.

Примечание. Я использую искру 2.0!

4b9b3361

Ответ 1

Здесь нет ничего неожиданного. Вы пытаетесь использовать код, который был написан с помощью Spark 1.x и больше не поддерживается в Spark 2.0:

  • в 1.x DataFrame.map есть ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • в 2.x Dataset[Row].map есть ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

Честно говоря, это не имеет особого смысла в 1.x. Независимо от версии вы можете просто использовать API DataFrame:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

Если вы действительно хотите использовать map, вы должны использовать статически типизированный Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

или, по крайней мере, вернуть объект, который будет иметь неявный кодировщик:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

Наконец, если для некоторой полностью сумасшедшей причины, которую вы действительно хотите отобразить над Dataset[Row], вам необходимо предоставить требуемый кодер:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

Ответ 2

Для сценария, где схема данных данных заранее известна, заданный @zero323, является решением

но для сценария с динамической схемой/или передачи нескольких данных в общую функцию: Следующий код работал для нас при переносе с 1.6.1 из 2.2.0

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

этот код выполняется на обеих версиях искры.

Недостаток: оптимизация с помощью искры на dataframe/datasets api не будет применяться.