Подтвердить что ты не робот

Определение UDF, который принимает массив объектов в Spark DataFrame?

При работе с Spark DataFrames для отображения данных в столбцах требуются пользовательские функции (UDF). UDF требуют, чтобы типы аргументов были явно указаны. В моем случае мне нужно манипулировать столбцом, состоящим из массивов объектов, и я не знаю, какой тип использовать. Вот пример:

import sqlContext.implicits._

// Start with some data. Each row (here, there only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |{
  |  "topic" : "pets",
  |  "subjects" : [
  |    {"type" : "cat", "score" : 10},
  |    {"type" : "dog", "score" : 1}
  |  ]
  |}
  """)))

Сравнительно просто использовать встроенный org.apache.spark.sql.functions для выполнения основных операций с данными в столбцах

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+

и обычно легко писать пользовательские UDF для выполнения произвольных операций

import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
|      PETS|             2|
+----------+--------------+

Но что, если я хочу использовать UDF для управления массивом объектов в столбце "subject"? Какой тип я использую для аргумента в UDF? Например, если я хочу переопределить функцию размера, вместо того, чтобы использовать тот, который предоставляется искровой:

val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show

Ясно, что Array[Something] не работает... какой тип я должен использовать!? Должен ли я вообще Array[]? Выкалывание говорит мне, что scala.collection.mutable.WrappedArray может иметь какое-то отношение к нему, но все же есть другой тип, который мне нужно предоставить.

4b9b3361

Ответ 1

То, что вы ищете, это Seq[oassql.Row]:

import org.apache.spark.sql.Row

val my_size = udf { subjects: Seq[Row] => subjects.size }

Пояснение:

  • Как вы уже знаете, текущим представлением ArrayType является WrappedArray поэтому Array работать не будет, и лучше оставаться в безопасности.
  • Локальный тип для StructType - Row. К сожалению, это означает, что доступ к отдельным полям небезопасен.

Примечания:

  • Чтобы создать struct в Spark <2.3, функция, переданная в udf, должна возвращать тип Product (Tuple* или case class), а не Row. Это потому, что соответствующие варианты udf зависят от отражения Scala:

    Определяет закрытие Scala из n аргументов как пользовательскую функцию (UDF). Типы данных автоматически выводятся на основании подписи закрытия Scala.

  • В Spark> = 2.3 можно напрямую возвращать Row, если указана схема.

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunction Определяет детерминированную пользовательскую функцию (UDF) с использованием замыкания Scala. Для этого варианта вызывающая сторона должна указать тип выходных данных, и нет автоматического приведения типа ввода.

    Смотрите, например, Как создать Spark UDF в Java/Kotlin, который возвращает сложный тип? ,