Подтвердить что ты не робот

Использование reduceByKey в Apache Spark (Scala)

У меня есть список Tuples типа: (идентификатор пользователя, имя, счет).

Например,

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)

Я пытаюсь уменьшить эту коллекцию до типа, в котором каждый имя элемента подсчитывается.

Итак, в приведенном выше val x преобразуется в:

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

Вот код, который я использую в настоящее время:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

Я пытаюсь использовать reduceByKey, поскольку он работает быстрее, чем groupByKey.

Как можно уменьшитьByKey вместо кода выше, чтобы обеспечить то же отображение?

4b9b3361

Ответ 1

Следуя вашему коду:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

Вы можете сделать:

val reducedByKey = byKey.reduceByKey(_ + _)

scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)

PairRDDFunctions[K,V].reduceByKey принимает ассоциативную функцию сокращения, которая может быть применена к типу V RDD [(K, V)]. Другими словами, вам нужна функция f[V](e1:V, e2:V) : V. В этом конкретном случае с суммой на Ints: (x:Int, y:Int) => x+y или _ + _ в короткой форме подчеркивания.

Для записи: reduceByKey работает лучше, чем groupByKey, потому что он должен применять функцию уменьшения локально до фазы перетасовки/уменьшения. groupByKey заставит перетасовать все элементы перед группировкой.

Ответ 2

Структура источника данных: RDD [(String, String, Int)] и reduceByKey может использоваться только в том случае, если структура данных RDD [(K, V)].

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)

Ответ 3

Синтаксис ниже:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

который говорит, что для того же ключа в RDD он принимает значения (которые будут определенно одного типа) выполняет операцию, предоставляемую как часть функции, и возвращает значение того же типа, что и для родительского RDD.