Подтвердить что ты не робот

Создание пользовательской функции в Spark-SQL

Я новичок в искру и искрах sql, и я пытался запросить некоторые данные, используя искру SQL.

Мне нужно забрать месяц с даты, указанной в виде строки.

Я думаю, что невозможно запросить месяц непосредственно из sparkqsl, поэтому я думал написать пользовательскую функцию в scala.

Можно ли написать udf в sparkSQL и, если возможно, кто-нибудь может предложить лучший способ записи udf.

Пожалуйста, помогите

4b9b3361

Ответ 1

Вы можете сделать это, по крайней мере, для фильтрации, если хотите использовать интегрированный язык.

Для файла данных date.txt, содержащего:

one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15

В своем UDF вы можете упаковать как можно больше магии даты Scala, но я буду держать ее просто:

def myDateFilter(date: String) = date contains "-08-"

Задайте все, как показано ниже: это много из Руководство по программированию.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class for your records
case class Entry(name: String, when: String)

// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))

Вы можете использовать UDF как часть вашего предложения WHERE:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)

и посмотреть результаты:

augustEntries.map(r => r(0)).collect().foreach(println)

Обратите внимание на версию метода where, который я использовал, объявленную в документе doc:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD

Итак, UDF может принимать только один аргумент, но вы можете составить несколько вызовов .where() для фильтрации по нескольким столбцам.

Изменить для Spark 1.2.0 (и действительно 1.1.0 тоже)

Пока он не документирован, Spark теперь поддерживает регистрацию UDF, поэтому он может быть запрошен с SQL.

Вышеуказанный UDF можно зарегистрировать, используя:

sqlContext.registerFunction("myDateFilter", myDateFilter)

и если таблица была зарегистрирована

sqlContext.registerRDDAsTable(entries, "entries")

он может быть запрошен с помощью

sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")

Подробнее см. этот пример.

Ответ 2

В PySpark 1.5 и выше мы можем легко достичь этого с помощью встроенной функции.

Ниже приведен пример:

raw_data = 
[

("2016-02-27 23:59:59", "Gold", 97450.56),

("2016-02-28 23:00:00", "Silver", 7894.23),

("2016-02-29 22:59:58", "Titanium", 234589.66)]


Time_Material_revenue_df  = 
sqlContext.createDataFrame(raw_data, ["Sold_time", "Material", "Revenue"])

from pyspark.sql.functions import  *

Day_Material_reveneu_df = Time_Material_revenue_df.select(to_date("Sold_time").alias("Sold_day"), "Material", "Revenue")

Ответ 3

В Spark 2.0 вы можете сделать это:

// define the UDF
def convert2Years(date: String) = date.substring(7, 11)
// register to session
sparkSession.udf.register("convert2Years", convert2Years(_: String))
val moviesDf = getMoviesDf // create dataframe usual way
moviesDf.createOrReplaceTempView("movies") // 'movies' is used in sql below
val years = sparkSession.sql("select convert2Years(releaseDate) from movies")