Подтвердить что ты не робот

Составная функция PySpark

В качестве упрощенного примера у меня есть dataframe "df" с столбцами "col1, col2", и я хочу вычислить максимальный размер строки после применения функции к каждому столбцу:

def f(x):
    return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

Итак, если df:

col1   col2
1      2
3      0

Тогда

df2:

col1   col2  result
1      2     3
3      0     4

Вышеприведенное не работает и создает "Невозможно оценить выражение: PythonUDF # f..."

Я абсолютно уверен, что "f_udf" отлично работает на моем столе, и главная проблема связана с max_udf.

Не создавая дополнительных столбцов или используя базовую карту/сокращение, существует ли способ сделать это полностью с использованием dataframes и udfs? Как мне изменить "max_udf"?

Я также пробовал:

max_udf=udf(max, IntegerType())

который производит ту же ошибку.

Я также подтвердил, что следующие работы:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

Почему я не могу сделать это за один раз?

Я хотел бы получить ответ, обобщающий любую функцию "f_udf" и "max_udf."

4b9b3361

Ответ 1

У меня была аналогичная проблема, и я нашел решение в ответе этого вопроса о стекеповерхности

Чтобы передать несколько столбцов или целую строку в UDF, используйте struct:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))

new_df.show()

возвращает:

+----+----+----------+
|   a|   b|null_count|
+----+----+----------+
|null|null|         2|
|   1|null|         1|
|null|   2|         1|
+----+----+----------+

Ответ 2

UserDefinedFunction выдает ошибку при принятии UDF в качестве своих аргументов.

Вы можете изменить max_udf, как показано ниже, чтобы заставить его работать.

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])

max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())

df2 = df.withColumn("result", max_udf(df.col1, df.col2))

Или

def f_udf(x):
    return (x + 1)

max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())

df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))

Примечание

Второй подход действителен тогда и только тогда, когда внутренние функции (здесь f_udf) генерируют правильные выражения SQL.

Здесь он работает, потому что f_udf(df.col1) и f_udf(df.col2) оцениваются как Column<b'(col1 + 1)'> и Column<b'(col2 + 1)'> соответственно, перед передачей на max_udf. Он не будет работать с произвольной функцией.

Это не сработает, если мы попробуем например что-то вроде этого:

from math import exp

df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))