Подтвердить что ты не робот

Найти максимальную строку для каждой группы в Spark DataFrame

Я пытаюсь использовать Spark dataframes вместо RDD, поскольку они выглядят более высокоуровневыми, чем RDD, и имеют тенденцию создавать более читаемый код.

В 14-узловой группе Google Dataproc у меня есть около 6 миллионов имен, которые переведены на идентификаторы двумя разными системами: sa и sb. Каждая Row содержит name, id_sa и id_sb. Моя цель - создать сопоставление от id_sa к id_sb, так что для каждого id_sa соответствующий id_sb является самым частым идентификатором среди всех имен, прикрепленных к id_sa.

Попробуем пояснить пример. Если у меня есть следующие строки:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

Моя цель - создать сопоставление от a1 до b2. Действительно, имена, связанные с a1 - n1, n2 и n3, которые сопоставляются соответственно с b1, b2 и b2, поэтому b2 является наиболее частым отображением в именах, связанных с a1. Точно так же a2 будет отображаться на b2. Это нормально предположить, что всегда будет победитель: нет необходимости разорвать связи.

Я надеялся, что я могу использовать groupBy(df.id_sa) на моем фреймворке данных, но я не знаю, что делать дальше. Я надеялся на агрегацию, которая в конечном итоге могла бы произвести следующие строки:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

Но, возможно, я пытаюсь использовать неправильный инструмент, и я должен просто вернуться к использованию RDD.

4b9b3361

Ответ 1

Использование join (это приведет к более чем одной строке в группе в случае связей):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Использование оконных функций (приведет к удалению связей):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Использование упорядочения struct:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

См. Также Как выбрать первую строку каждой группы?

Ответ 2

Я думаю, что вы можете искать функции окна: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Вот пример в Scala (сейчас у меня нет Spark Shell с Hive, поэтому я не смог протестировать код, но я думаю, что он должен работать):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

Вероятно, есть более эффективные способы достижения тех же результатов с функциями Window, но я надеюсь, что это указывает на то, что вы в правильном направлении.