Подтвердить что ты не робот

Как разбить фрейм данных на dataframes с одинаковыми значениями столбцов?

Используя Scala, как я могу разделить dataFrame на несколько dataFrame (будь то массив или коллекция) с одинаковым значением столбца. Например, я хочу разбить следующий DataFrame:

ID  Rate    State
1   24  AL
2   35  MN
3   46  FL
4   34  AL
5   78  MN
6   99  FL

в

набор данных 1

ID  Rate    State
1   24  AL  
4   34  AL

набор данных 2

ID  Rate    State
2   35  MN
5   78  MN

набор данных 3

ID  Rate    State
3   46  FL
6   99  FL
4b9b3361

Ответ 1

Вы можете собирать уникальные значения состояния и просто отображать результирующий массив:

val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))

или для отображения:

val byStateMap = states
    .map(state => (state -> df.where($"State" <=> state)))
    .toMap

То же самое в Python:

from itertools import chain
from pyspark.sql.functions import col

states = chain(*df.select("state").distinct().collect())

# PySpark 2.3 and later
# In 2.2 and before col("state") == state) 
# should give the same outcome, ignoring NULLs 
# if NULLs are important 
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
df_by_state = {state: 
  df.where(col("state").eqNullSafe(state)) for state in states}

Очевидная проблема заключается в том, что для каждого уровня требуется полное сканирование данных, поэтому это дорогостоящая операция. Если вы ищете способ просто разделить выход, см. Также Как разделить RDD на два или более RDD?

В частности, вы можете написать Dataset, разделенный на интересующий столбец:

val path: String = ???
df.write.partitionBy("State").parquet(path)

и при необходимости верните сообщение:

// Depend on partition prunning
for { state <- states } yield spark.read.parquet(path).where($"State" === state)

// or explicitly read the partition
for { state <- states } yield spark.read.parquet(s"$path/State=$state")

В зависимости от размера данных, количества уровней разделения, хранения и уровня персистентности ввода он может быть быстрее или медленнее, чем несколько фильтров.

Ответ 2

Это очень просто (если искробезопасная версия равна 2), если вы делаете dataframe в качестве временной таблицы.

df1.createOrReplaceTempView("df1")

И теперь вы можете делать запросы,

var df2 = spark.sql("select * from df1 where state = 'FL'")
var df3 = spark.sql("select * from df1 where state = 'MN'")
var df4 = spark.sql("select * from df1 where state = 'AL'")

Теперь вы получили df2, df3, df4. Если вы хотите использовать их в качестве списка, вы можете использовать

df2.collect()
df3.collect()

или даже функцию отображения/фильтра. Пожалуйста, обратитесь https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

Зола