Подтвердить что ты не робот

Операции с несколькими агрегатами в одном столбце блока данных искробезопасности

У меня есть три массива типа string, содержащие следующую информацию:

  • arrayBy array: содержащие имена столбцов, в которые я хочу сгруппировать свои данные.
  • совокупный массив: содержащий имена столбцов, которые я хочу заполнить.
  • массив операций: содержащий агрегированные операции, которые я хочу выполнить

Я пытаюсь использовать световые кадры для достижения этого. Исходные кадры Spark предоставляют agg(), где вы можете передать Map [String, String] (имя столбца и соответствующую агрегированную операцию) в качестве ввода, однако я хочу выполнить различные операции агрегации в одном столбце данных. Любые предложения о том, как достичь этого?

4b9b3361

Ответ 1

Scala:

Например, вы можете отобразить список функций с определенным mapping от имени к функции:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

или же

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

К сожалению, синтаксический анализатор, который используется внутри SQLContext, не SQLContext публично, но вы всегда можете попробовать создать простые SQL-запросы:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

Смотрите также:

Ответ 2

Для тех, кто интересуется, как ответ @zero323 может быть написан без понимания списка в python:

from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)

Ответ 3

case class soExample(firstName: String, lastName: String, Amount: Int)
val df =  Seq(soExample("me", "zack", 100)).toDF

import org.apache.spark.sql.functions._

val groupped = df.groupBy("firstName", "lastName").agg(
     sum("Amount"),
     mean("Amount"), 
     stddev("Amount"),
     count(lit(1)).alias("numOfRecords")
   ).toDF()

display(groupped)

//Вежливость Зак..

Зак упростил ответ для поста с пометкой Duplicate Spark Scala Data Frame, чтобы иметь множественную агрегацию одной группы By