У меня есть фреймворк Spark 2.0 example
со следующей структурой:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Он содержит 24 записи для каждого идентификатора (по одному для каждого часа дня) и упорядочен по идентификатору, час, используя функцию orderBy.
Я создал агрегатор groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
Это помогает мне конкатенировать столбцы в строки, чтобы получить этот окончательный фрейм:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
Мой вопрос: если я делаю example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
, это гарантирует, что часовые подсчеты будут правильно упорядочены в соответствующих ведрах?
Я читал, что это не обязательно для RDD (см. Spark сортировать по ключу, а затем группировать, чтобы получить упорядоченное итерабельное?), но, возможно, это отличается для DataFrames?
Если нет, как я могу обойти это?