Подтвердить что ты не робот

Обновление столбца dataframe в искровом режиме

Взглянув на новую диаграмму api-диаграммы искры, неясно, можно ли изменять столбцы данных.

Как мне изменить значение в строке x столбца y в кадре данных?

В pandas это будет df.ix[x,y] = new_value

Изменить. Объединив сказанное ниже, вы не можете изменять существующий фреймворк данных, поскольку он является неизменным, но вы можете вернуть новый фреймворк с желаемыми изменениями.

Если вы просто хотите заменить значение в столбце на основе условия, например np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Если вы хотите выполнить некоторую операцию над столбцом и создать новый столбец, который добавлен в фреймворк данных:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Если вы хотите, чтобы новый столбец имел то же имя, что и старый столбец, вы можете добавить дополнительный шаг:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
4b9b3361

Ответ 1

Пока вы не можете изменить столбец как таковой, вы можете работать с столбцом и возвращать новый DataFrame, отражающий это изменение. Для этого вы должны сначала создать UserDefinedFunction реализацию применяемой операции, а затем выборочно применить эту функцию только к целевому столбцу. В Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df теперь имеет ту же схему, что и old_df (предполагая, что old_df.target_column имеет тип StringType), но все значения в столбце target_column будут new_value.

Ответ 2

Обычно при обновлении столбца мы хотим сопоставить старое значение с новым значением. Здесь можно сделать это в pyspark без UDF's:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

Ответ 3

DataFrames основаны на RDD. RDD являются неизменяемыми структурами и не позволяют обновлять элементы на месте. Чтобы изменить значения, вам нужно будет создать новый DataFrame, преобразовывая исходный либо с помощью SQL-подобных операций DSL или RDD, таких как map.

Рекомендуемая слайд-панель: Представление DataFrames в Spark для крупномасштабных научных исследований.

Ответ 4

Так же, как maasg говорит, что вы можете создать новый DataFrame из результата карты, примененной к старым DataFrame. Пример для данного DataFrame df с двумя строками:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Обратите внимание, что если типы столбцов меняются, вам нужно указать правильную схему вместо df.schema. Проверьте api org.apache.spark.sql.Row на доступные методы: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Обновить] Или используя UDF в Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

и если имя столбца должно оставаться неизменным, вы можете переименовать его:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")