PySpark DataFrames - способ перечисления без преобразования на Pandas? - программирование
Подтвердить что ты не робот

PySpark DataFrames - способ перечисления без преобразования на Pandas?

У меня очень большой pyspark.sql.dataframe.DataFrame с именем df. Мне нужен какой-то способ перечисления записей, таким образом, доступ к записи с определенным индексом. (или выберите группу записей с диапазоном индексов)

В pandas я мог бы сделать только

indexes=[2,3,6,7] 
df[indexes]

Здесь я хочу нечто подобное (и без преобразования данных в pandas)

Ближайшим, к которому я могу добраться, является:

  • Перечисление всех объектов в исходном фрейме данных:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    
    • Поиск значений, которые мне нужны, с помощью функции where().

ВОПРОСЫ:

  • Почему это не работает и как заставить его работать? Как добавить строку в dataframe?
  • Будет ли работать позже, чтобы сделать что-то вроде:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    
  • Какой бы быстрый и простой способ справиться с этим?

4b9b3361

Ответ 1

Это не работает, потому что:

  • второй аргумент для withColumn должен быть Column не коллекцией. np.array здесь не будет работать.
  • когда вы передаете "index in indexes", поскольку выражение SQL для where indexes выходит за пределы области видимости и не разрешено как допустимый идентификатор

PySpark >= 1.4.0

Вы можете добавить номера строк, используя соответствующую функцию окна и запрос, используя метод Column.isin или правильно сформированную строку запроса:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

Похоже, что функции окна, называемые без предложения PARTITION BY, перемещают все данные в один раздел, поэтому выше может быть не лучшим решением в конце концов.

Любой более быстрый и простой способ справиться с этим?

Не совсем. Spark DataFrames не поддерживает случайный доступ к строкам.

PairedRDD можно получить с помощью метода lookup, который является относительно быстрым, если данные разбиты на разделы с помощью HashPartitioner. Существует также indexed-rdd проект, который поддерживает эффективный поиск.

Edit

Независимо от версии PySpark вы можете попробовать что-то вроде этого:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## +----+
## |char|
## +----+
## |   a|
## |   b|
## |   c|
## |   d|
## |   e|
## +----+
## only showing top 5 rows

# This part is not tested but should work and save some work later
schema  = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
    .zipWithIndex() # Add index
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
    .toDF(schema)) # It will work without schema but will be more expensive

# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))

Ответ 2

Если вам нужен диапазон номеров, который гарантированно не столкнется, но не требует .over(partitionBy()), вы можете использовать monotonicallyIncreasingId().

from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")

Обратите внимание, что значения не особенно "опрятные". Каждому разделу присваивается диапазон значений, и вывод не будет смежным. Например. 0, 1, 2, 8589934592, 8589934593, 8589934594.

Это было добавлено в Spark 28 апреля 2015 года здесь: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

Ответ 3

Конечно, вы можете добавить массив для индексирования, массив по вашему выбору: В Scala сначала нам нужно создать индексный массив:

val index_array=(1 to df.count.toInt).toArray

index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Теперь вы можете добавить этот столбец в свой DF. Во-первых, для этого вам нужно открыть наш DF и получить его как массив, а затем застегнуть его с помощью index_array, а затем преобразовать новый массив в RDD. Последний шаг - получить его как DF:

final_df = sc.parallelize((df.collect.map(
    x=>(x(0),x(1))) zip index_array).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))).
    toDF("column_name")

После этого индексирование станет более понятным.

Ответ 4

monotonicallyIncreasingId() - это назначит номера строк в порядке incresing, но не в последовательности.

выборка с двумя столбцами:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 12 | xz | |---------------------|------------------|

Если вы хотите присвоить номера строк, используйте следующий трюк.

Протестировано в версиях искры 2.0.1 и более.

df.createOrReplaceTempView("df") dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")

выборка с двумя столбцами:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 2 | xz | |---------------------|------------------|

Надеюсь, что это поможет.