Подтвердить что ты не робот

Как добавить постоянный столбец идентификаторов строк в Spark DataFrame?

Этот вопрос не нова, однако я нахожу удивительное поведение в Spark. Мне нужно добавить столбец идентификаторов строк в DataFrame. Я использовал метод DataFrame monotonically_increasing_id(), и он дает мне дополнительный код уникальных идентификаторов строк (которые НЕ являются последовательными, но уникальны).

Проблема, с которой я столкнулась, заключается в том, что при фильтрации DataFrame идентификаторы строк в результирующем DataFrame повторно назначаются. Ниже приведены два DataFrames.

  • первый - это исходный DataFrame с идентификаторами строк, добавленными следующим образом:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • второй DataFrame - это тот, который был получен после фильтрации на col P через df.filter(col("P")).

Проблема проиллюстрирована rowId для custId 169, которая была 5 в исходном DataFrame, но после фильтрации эта строкаId (5) была повторно назначена custmId 773, когда custId 169 был отфильтрован! Я не знаю, почему это поведение по умолчанию.

Я хотел бы, чтобы rowIds был "липким"; если я удаляю строки из DataFrame, я не хочу, чтобы их идентификаторы "повторно использовались", я хочу, чтобы они ушли вместе со своими строками. Можно ли это сделать? Я не вижу никаких флагов для запроса этого поведения из метода monotonically_increasing_id.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|
4b9b3361

Ответ 1

Spark 2.0

  • Это проблема была решена в Spark 2.0 с SPARK-14241.

  • Другая аналогичная проблема была решена в Spark 2.1 с SPARK-14393

Искра 1.x

Проблема, которую вы испытываете, довольно тонкая, но может быть сведена к простому факту monotonically_increasing_id - чрезвычайно уродливая функция. Это явно не чисто, и его ценность зависит от того, что полностью не контролируется.

Он не принимает никаких параметров, поэтому с точки зрения оптимизатора это не имеет значения, когда он вызывается и может быть нажат после всех других операций. Отсюда и поведение, которое вы видите.

Если вы посмотрите на код, вы обнаружите, что это явно отмечено расширением выражения MonotonicallyIncreasingID с помощью Nondeterministic.

<ы > Я не думаю, что есть какое-то изящное решение, но один из способов справиться с этим - добавить искусственную зависимость от отфильтрованного значения. Например, с помощью UDF, например:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

В общем случае было бы проще добавлять индексы с помощью zipWithIndex на RDD, а затем преобразовать его обратно в DataFrame.


* Обходной путь, показанный выше, уже не является допустимым решением (и не требуется) в Spark 2.x, где UDF Python подвержены оптимизации плана выполнения.

Ответ 2

Я не мог воспроизвести это. Я использую Spark 2.0, хотя, возможно, поведение изменилось, или я не делаю то же, что и вы.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())

df.show

val df2 = df.filter(col("flag")=== true)

df2.show

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
|  one|    1| true|   0|
|  two|    2|false|   1|
|three|    3| true|   2|
| four|    4| true|   3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
|  one|    1|true|   0|
|three|    3|true|   2|
| four|    4|true|   3|
+-----+-----+----+----+

Ответ 3

Я недавно работал над аналогичной проблемой. Хотя monotonically_increasing_id() очень быстрый, он ненадежен и не даст вам последовательных номеров строк, а только увеличит уникальные целые числа.

Создание раздела Windows с последующим использованием row_number().over(some_windows_partition) занимает очень много времени.

Наилучшим решением на данный момент является использование zip с индексом, а затем преобразование zip файла обратно в исходный фрейм данных с новой схемой, включающей столбец индекса.

Попробуй это:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

Где original_dataframe это dataframe вы должны добавить индекс и row_with_index является новой схемой с индексом столбца, который вы можете написать, как

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

Здесь, calendar_date, year_week_number, year_period_number, и realization были столбцами моего исходного dataframe. Вы можете заменить имена именами ваших столбцов. Индекс - это новое имя столбца, которое вы должны были добавить для номеров строк.

Этот процесс значительно эффективнее и плавнее по сравнению с row_number().over(some_windows_partition).

Надеюсь это поможет.

Ответ 4

Чтобы обойти сдвигающую оценку monotonically_increasing_id(), вы можете попробовать записать фрейм данных на диск и перечитать. Тогда столбец id теперь просто поле данных, которое считывается, а не динамически вычисляется в какой-то момент в конвейере. Хотя это довольно уродливое решение, оно работало, когда я быстро проверил.

Ответ 5

Это сработало для меня. Создал другой столбец идентификаторов и использовал функцию окна row_number

import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window

val df1: DataFrame = df.withColumn("Id",lit(1))

df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)

Ответ 6

Чтобы получить более высокую производительность по сравнению с решением Chris T, вы можете попытаться записать в разделяемый фрейм данных с поддержкой apache вместо записи на диск. https://ignite.apache.org/use-cases/spark/shared-memory-layer.html