Подтвердить что ты не робот

Spark: как перевести счетчик (отдельный (значение)) в API Dataframe

Я пытаюсь сравнить разные способы агрегирования моих данных.

Это мои входные данные с двумя элементами (страница, посетитель):

(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)

Работа с командой SQL в Spark SQL с помощью этого кода:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
                              """select page
                                       ,count(distinct visitor) as visitor
                                   from logs
                               group by page
                              """)
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)

Я получаю этот вывод:

(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors

Теперь я хотел бы получить тот же результат, используя Dataframes и API-интерфейс thiers, но я не могу получить тот же результат:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)

Фактически, это то, что я получаю как вывод:

[PAG1,8]  // just the simple page count for every page
[PAG2,4]

Это, наверное, что-то немое, но я не вижу его прямо сейчас.

Спасибо заранее!

FF

4b9b3361

Ответ 1

Вам нужна функция агрегации DataFrame countDistinct:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

case class Log(page: String, visitor: String)

val logs = data.map(p => Log(p._1,p._2))
            .toDF()

val result = logs.select("page","visitor")
            .groupBy('page)
            .agg('page, countDistinct('visitor))

result.foreach(println)

Ответ 2

Вы можете использовать команду dataframe groupBy дважды, чтобы сделать это. Здесь df1 - ваш исходный ввод.

val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))

Эта команда выдаст следующий результат:

page  visitor  count
----  ------   ----
PAG2    V2       2
PAG1    V3       1
PAG1    V1       5
PAG1    V2       2
PAG2    V1       2

Затем снова используйте команду groupBy, чтобы получить окончательный результат.

 df2.groupBy($"page").agg(count($"visitor").as("count"))

Окончательный вывод:

page   count
----   ----
PAG1    3
PAG2    2

Ответ 3

Я думаю, что в новых версиях Spark это проще. Следующее проверено с 2.4.0. 1. Сначала создайте массив для образца.

val myArr = Array(
      ("PAG1","V1"),
      ("PAG1","V1"),
      ("PAG2","V1"),
      ("PAG2","V2"),
      ("PAG2","V1"),
      ("PAG1","V1"),
      ("PAG1","V2"),
      ("PAG1","V1"),
      ("PAG1","V2"),
      ("PAG1","V1"),
      ("PAG2","V2"),
      ("PAG1","V3")
    )

2. Создать фрейм данных

val logs = spark.createDataFrame(myArr)
    .withColumnRenamed("_1","page")
    .withColumnRenamed("_2","visitor")

3. Теперь агрегация с функциейручной sql perfectCount

import org.apache.spark.sql.{functions => F}
logs.groupBy("page").agg(
    F.countDistinct("visitor").as("visitor"))
    .show()

4. Ожидаемый результат:

+----+-------+
|page|visitor|
+----+-------+
|PAG1|      3|
|PAG2|      2|
+----+-------+