Подтвердить что ты не робот

Spark DataFrame: делает groupBy после orderBy поддерживать этот порядок?

У меня есть фреймворк Spark 2.0 example со следующей структурой:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

Он содержит 24 записи для каждого идентификатора (по одному для каждого часа дня) и упорядочен по идентификатору, час, используя функцию orderBy.

Я создал агрегатор groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

Это помогает мне конкатенировать столбцы в строки, чтобы получить этот окончательный фрейм:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

Мой вопрос: если я делаю example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), это гарантирует, что часовые подсчеты будут правильно упорядочены в соответствующих ведрах?

Я читал, что это не обязательно для RDD (см. Spark сортировать по ключу, а затем группировать, чтобы получить упорядоченное итерабельное?), но, возможно, это отличается для DataFrames?

Если нет, как я могу обойти это?

4b9b3361

Ответ 1

groupBy после orderBy не поддерживает порядок, как указали другие. То, что вы хотите сделать, это использовать функцию Window - разделение по id и порядок по часам. Вы можете собрать по этому списку, а затем взять максимальное (наибольшее) из полученных списков, поскольку они идут кумулятивно (т.е. Первый час будет содержать только себя в списке, второй час будет иметь 2 элемента в списке и т.д.).

Полный пример кода:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
  ("id1", 1, 55),
  ("id1", 23, 44),
  ("id2", 0, 12),
  ("id2", 1, 89),
  ("id2", 23, 34)).toDF("id", "hour", "count")

    val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
    data.withColumn("collected", collect_list($"count")
                                                    .over(Window.partitionBy("id")
                                                                 .orderBy("hour")))
            .groupBy("id")
            .agg(max($"collected").as("collected"))
            .withColumn("hourly_count", mergeList($"collected"))
            .select("id", "hourly_count").show

Это держит нас в мире DataFrame. Я также упростил код UDF, который использовал OP.

Выход:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+

Ответ 2

У меня есть случай, когда порядок не всегда сохраняется: иногда да, в основном нет.

У моей DataFrame есть 200 разделов, работающих на Spark 1.6

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                  F.sort_array(F.collect_list(times)),
                                                  F.collect_list(times)
                                                           )

чтобы проверить порядок, я сравниваю возвращаемые значения

F.sort_array(F.collect_list(times))

и

F.collect_list(times)
<p> (left: sort_array (collect_list()); right: collect_list())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000

Левый столбец всегда сортируется, а правый столбец состоит только из отсортированных блоков. Для разных исполнений take() порядок блоков в правом столбце отличается.

Ответ 3

Если вы хотите обойти реализацию в Java (Scala и Python должны быть похожими):

example.orderBy("hour")
.groupBy("id")
.agg(functions.sort_array(
  functions.collect_list( 
     functions.struct(dataRow.col("hour"),
                      dataRow.col("count"))),false)
 .as("hourly_count"));

Ответ 4

может быть или не быть одинаковым, в зависимости от количества разделов и распределения данных. Мы можем решить, используя сам rdd.

Например:

Я сохранил приведенные ниже образцы данных в файле и загрузил их в hdfs.

1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800

и выполните следующую команду:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()

выход:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))

То есть мы сгруппировали данные по типу, затем отсортировали по цене и объединили идентификаторы с символом "~" в качестве разделителя. Вышеуказанная команда может быть разбита, как показано ниже:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)

val groupedData=validData.groupBy(_(1))  //group data rdds

val sortedJoinedData=groupedData.mapValues(x=>{
   val list=x.toList
   val sortedList=list.sortBy(_(2))
   val idOnlyList=sortedList.map(_(0))
   idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()

мы можем взять определенную группу, используя команду

sortedJoinedData.filter(_._1=="type1").collect()

выход:

Array[(String, String)] = Array((type1,2~1~5))

Ответ 5

Нет, сортировка внутри groupByKey не обязательно будет поддерживаться, но, как известно, ее трудно воспроизвести в памяти на одном узле. Как было сказано ранее, наиболее типичный способ, которым это происходит, - это когда вещи нужно перераспределить, чтобы произошел groupByKey. Мне удалось воспроизвести это, вручную делая repartition после sort. Затем я передал результаты в groupByKey.

case class Numbered(num:Int, group:Int, otherData:Int)

// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number 

val v =
  (1 to 100000)
    // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
    .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
    // Be sure they are stored in a small number of partitions
    .repartition(2)
    .sort($"num")
    // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
    .repartition(200)
    .groupByKey(_.group)
    .mapGroups {
      case (g, nums) =>
        nums             // all you need is .sortBy(_.num) here to fix the problem          
          .map(_.num)
          .mkString("~")
    }
    .collect()

// Walk through the concatenated strings. If any number ahead 
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
  r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
    if (next < prev) {
      println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
    }
    next
  }
}

Ответ 6

Короткий ответ: Да, часовой подсчет будет поддерживать тот же порядок.

Чтобы обобщить, важно, чтобы вы сортировали, прежде чем группировать. Также сортировка должна быть такой же, как и группа + столбец, для которого вы действительно хотите сортировку.

Пример будет выглядеть так:

employees
    .sort("company_id", "department_id", "employee_role")
    .groupBy("company_id", "department_id")
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role")