Подтвердить что ты не робот

Как агрегировать значения в коллекции после groupBy?

У меня есть блок данных со схемой как таковой:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

Ищете способ группировать (или, может быть, rollup?) этот dataframe от whoid, где столбцы trackingIds и emailIds будут добавляться вместе. Так, например, если мой начальный df выглядит так:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []

Я бы хотел, чтобы мой вывод df выглядел следующим образом

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]

Попытка использовать операторы groupBy и agg, но не имеет большой удачи.

4b9b3361

Ответ 1

Искра 2.x

Возможно, но довольно дорого. Используя данные, которые вы предоставили:

case class Record(
    visitorId: String, trackingIds: Array[String], emailIds: Array[String])

val df = Seq(
  Record("a158", Array("666b"), Array("12")),
  Record("7g21", Array("c0b5"), Array("45")),
  Record("7g21", Array("c0b4"), Array("87")),
  Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF

и вспомогательная функция:

import org.apache.spark.sql.functions.udf

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)

мы можем заполнить пробелы заполнителями:

import org.apache.spark.sql.functions.{array, lit, when}

val dfWithPlaceholders = df.withColumn(
  "emailIds", 
  when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))

collect_lists и flatten:

import org.apache.spark.sql.functions.{array, collect_listn}

val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")

df
  .groupBy($"visitorId")
  .agg(trackingIds, emailIds)

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

С статически типизированным Dataset:

df.as[Record]
  .groupByKey(_.visitorId)
  .mapGroups { case (key, vs) => 
    vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
      case (trackingIds, emailIds) => 
        Record(key, trackingIds.flatten, emailIds.flatten)
  }}

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

Искра 1.x

Вы можете конвертировать в RDD и группу

import org.apache.spark.sql.Row

dfWithPlaceholders.rdd
  .map {
     case Row(id: String, 
       trcks: Seq[String @ unchecked],
       emails: Seq[String @ unchecked]) => (id, (trcks, emails))
  }
  .groupByKey
  .map {case (key, vs) => vs.toArray.unzip match {
    case (trackingIds, emailIds) => 
      Record(key, trackingIds.flatten, emailIds.flatten)
  }}
  .toDF

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// |     a158|[666b, 666b, 777c]|  [12, ]|
// +---------+------------------+--------+

Ответ 2

Ответ на

@zero323 довольно много, но Spark дает нам еще большую гибкость. Как насчет следующего решения?

import org.apache.spark.sql.functions._
inventory
  .select($"*", explode($"trackingIds") as "tracking_id")
  .select($"*", explode($"emailIds") as "email_id")
  .groupBy("visitorId")
  .agg(
    collect_list("tracking_id") as "trackingIds",
    collect_list("email_id") as "emailIds")

Это, однако, оставляет все пустые коллекции (так что есть место для улучшения:))