Я пытаюсь сравнить разные способы агрегирования моих данных.
Это мои входные данные с двумя элементами (страница, посетитель):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Работа с командой SQL в Spark SQL с помощью этого кода:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
Я получаю этот вывод:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Теперь я хотел бы получить тот же результат, используя Dataframes и API-интерфейс thiers, но я не могу получить тот же результат:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
Фактически, это то, что я получаю как вывод:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
Это, наверное, что-то немое, но я не вижу его прямо сейчас.
Спасибо заранее!
FF