Подтвердить что ты не робот

Использование метода monotonically_increasing_id() для присвоения номера строки pyspark dataframe

Я использую monotonically_increasing_id(), чтобы присвоить номер строки pyspark dataframe, используя синтаксис ниже:

df1 = df1.withColumn("idx", monotonically_increasing_id())

Теперь df1 имеет 26 572 528 записей. Поэтому я ожидал значения idx от 0 до 2672,727.

Но когда я выбираю max (idx), его значение странно огромно: 335 008 054 165.

Что происходит с этой функцией? можно ли использовать эту функцию для слияния с другим набором данных, имеющим аналогичное количество записей?

У меня есть около 300 фреймов данных, которые я хочу объединить в единый блок данных. Таким образом, один фрейм данных содержит идентификаторы, а другие содержат разные записи, соответствующие им по ряду строк

4b9b3361

Ответ 1

Из документации

Столбец, который генерирует монотонно увеличивающиеся 64-битные целые числа.

Созданный идентификатор гарантированно будет монотонно увеличивающимся и уникальным, но не последовательным. Текущая реализация помещает идентификатор раздела в верхние 31 бит, а номер записи в каждом разделе - в младшие 33 бита. Предполагается, что во фрейме данных менее 1 миллиарда разделов, а в каждом разделе менее 8 миллиардов записей.

Таким образом, он не похож на автоматическое приращение идентификатора в РБД и не является надежным для слияния.

Если вам нужно поведение автоинкремента, как в RDB, и ваши данные сортируются, то вы можете использовать row_number

df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
|  1|   ....... |
|  2|   ....... |
|  3| ..........|
+---+-----------+

Если ваши данные не сортируются, и вы не возражаете против использования rdds для создания индексов, а затем возвращаетесь к фреймам данных, вы можете использовать rdd.zipWithIndex()

Пример можно найти здесь

Короче:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()

df.show()

# your data           | indexes
+---------------------+---+
|         _1          | _2| 
+-----------=---------+---+
|[data col1,data col2]|  0|
|[data col1,data col2]|  1|
|[data col1,data col2]|  2|
+---------------------+---+

После этого вам, вероятно, понадобится еще несколько преобразований, чтобы привести ваш фрейм данных к тому, что вам нужно. Примечание: не очень эффективное решение.

Надеюсь это поможет. Удачи!

Edit: Давай думать об этом, вы можете объединить monotonically_increasing_id использовать row_number:

# create a monotonically increasing id 
df = df.withColumn("idx", monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the 'row_number'
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')

Не уверен насчет производительности, хотя.

Ответ 2

используя функции api, вы можете просто сделать следующее:

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df1 = df1.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("idx", F.row_number().over(windowSpec)).show()

Я надеюсь, что ответ будет полезен

Ответ 3

Я нашел решение @mkaran полезным, но для меня не было столбца порядка при использовании оконной функции. Я хотел сохранить порядок строк данных в качестве их индексов (то, что вы увидите в панде). Следовательно, решение в разделе редактирования пригодилось. Поскольку это хорошее решение (если производительность не является проблемой), я хотел бы поделиться им в качестве отдельного ответа.

# Add a increasing data column 
df_index = df.withColumn("idx", monotonically_increasing_id())

# Create the window specification
w = Window.orderBy("idx")

# Use row number with the window specification
df_index = df_index.withColumn("index", F.row_number().over(w))

# Drop the created increasing data column
df2_index = df2_index.drop("idx")

df - ваш оригинальный df_index а df_index - новый фрейм данных.