Подтвердить что ты не робот

ReduceByKey: Как это работает внутри?

Я новичок в Spark и Scala. Я был смущен тем, как функция ReduceByKey работает в Spark. Предположим, что у нас есть следующий код:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

Функция карты понятна: s - это ключ и указывает на строку из data.txt а 1 - значение.

Тем не менее, я не понял, как reduceByKey работает внутри страны? Указывает ли "а" на ключ? В качестве альтернативы, "a" указывает на "s"? Тогда что представляет собой + b? как они заполняются?

4b9b3361

Ответ 1

Позвольте разбить его на дискретные методы и типы. Это обычно раскрывает тонкости для новых разработчиков:

pairs.reduceByKey((a, b) => a + b)

становится

pairs.reduceByKey((a: Int, b: Int) => a + b)

и переименование переменных делает его немного более явным

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)

Итак, теперь мы можем видеть, что мы просто берем накопленное значение для данного ключа и суммируем его со следующим значением этого ключа. ТЕПЕРЬ, позвольте сломать его дальше, чтобы мы могли понять ключевую часть. Итак, давайте визуализировать метод более как это:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
  //Turn the accumulated value into a true key->value mapping
  val accumAsMap = accumulatedValue.toMap   
  //Try to get the key current value if we've already encountered it
  accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
    //If we have NOT encountered it, then simply add it to the list
    case None => currentValue :: accumulatedValue 
  }
})

Итак, вы можете видеть, что сокращение ByKey принимает шаблон поиска ключа и отслеживания его, так что вам не нужно беспокоиться об управлении этой частью.

Более глубокий, более верный, если вы хотите

Все, что сказано, это упрощенная версия того, что происходит, поскольку есть некоторые оптимизации, которые здесь выполняются. Эта операция ассоциативна, поэтому искровой двигатель сначала выполняет эти сокращения локально (часто называемый снижением на стороне карты), а затем снова у водителя. Это экономит сетевой трафик; вместо того, чтобы отправлять все данные и выполнять операцию, он может уменьшить ее как можно меньше, а затем отправить это сокращение по проводу.

Ответ 2

Одним из требований к функции reduceByKey является то, что должно быть ассоциативным. Чтобы построить некоторую интуицию о том, как работает reduceByKey, сначала посмотрим, как ассоциативная ассоциативная функция помогает нам в параллельном вычислении:

associative function in action

Как мы видим, мы можем разбить оригинальный коллекцию на части и, применив ассоциативную функцию, мы можем скопировать итог. Последовательный случай тривиален, мы используем его: 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10.

Ассоциативность позволяет использовать эту же функцию последовательно и параллельно. reduceByKey использует это свойство для вычисления результата из RDD, который является распределенной коллекцией, состоящей из разделов.

Рассмотрим следующий пример:

// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))

В искровом режиме данные распределяются по разделам. Для следующей иллюстрации (4) разделы расположены слева, заключенные в тонкие линии. Во-первых, мы применяем функцию локально к каждому разделу, последовательно в разделе, но мы запускаем все 4 раздела параллельно. Затем результат каждого локального вычисления агрегируется путем повторного применения той же функции и, наконец, доходит до результата.

enter image description here

reduceByKey - это специализация aggregateByKey aggregateByKey, которая выполняет две функции: одну, которая применяется к каждому разделу (последовательно) и к тем, которые применяются к результатам каждого раздела (параллельно). reduceByKey использует одну и ту же ассоциативную функцию в обоих случаях: выполнять последовательные вычисления на каждом разделе и затем комбинировать эти результаты с конечным результатом, как мы здесь проиллюстрировали.

Ответ 3

В вашем примере

val counts = pairs.reduceByKey((a,b) => a+b)

a и b являются Int аккумуляторами для _2 кортежей в pairs. reduceKey будет принимать два кортежа с тем же значением s и использовать их значения _2 как a и b, создавая новый Tuple[String,Int]. Эта операция повторяется до тех пор, пока не будет только один кортеж для каждой клавиши s.

В отличие от не-Spark (или, действительно, непараллельного) reduceByKey, где первый элемент всегда является аккумулятором, а второй значением, reduceByKey работает распределенным образом, то есть каждый node уменьшит его набор кортежей в коллекцию однозначно привязанных кортежей, а затем уменьшаем кортежи из нескольких узлов до тех пор, пока не будет окончательный набор кортежей с уникальным ключом. Это означает, что при уменьшении результатов от узлов a и b представляют собой уже уменьшенные аккумуляторы.