Подтвердить что ты не робот

Агрегированная функция Использовать счетчик с помощью groupBy в Spark

Я пытаюсь сделать несколько операций в одной строке кода в pySpark, и не уверен, что это возможно для моего случая.

Мое намерение состоит не в том, чтобы сохранить вывод в качестве нового фрейма данных.

Мой текущий код довольно прост:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

И я собираюсь добавить count() после использования groupBy, чтобы получить, ну, количество записей, соответствующих каждому значению столбца timePeriod, напечатанному\показанным как вывод.

При попытке использовать groupBy(..).count().agg(..) я получаю исключения.

Есть ли способ достичь как count(), так и agg().show(), без разделения кода на две строки команд, например.

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

Или, еще лучше, для получения объединенного вывода на вывод agg.show() - дополнительный столбец, в котором указано количество записей, соответствующих значению строки. например:.

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315
4b9b3361

Ответ 1

count() может использоваться внутри agg(), поскольку выражение groupBy такое же.

С Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

функции pySpark SQL doc

С Scala

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) будет считать записи по первому столбцу, который равен count("timePeriod")

С Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)