Подтвердить что ты не робот

Объяснение агрегатной функции scala

Я еще не понимаю, что такое агрегатная функция:

Например, имея:

val x = List(1,2,3,4,5,6)
val y = x.par.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x,y) => (x._1 + y._1, x._2 + y._2))

Результат будет: (21,6)

Ну, я думаю, что (x,y) => (x._1 + y._1, x._2 + y._2) должен получить результат параллельно, например, он будет (1 + 2, 1 + 1) и т.д.

Но именно эта часть меня путает:

(x, y) => (x._1 + y, x._2 + 1)

почему x._1 + y? и здесь x._2 есть 0?

Спасибо заранее.

4b9b3361

Ответ 1

В документации :

def aggregate[B](z: ⇒ B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B

Сопоставляет результаты применения оператора к последующим элементам.

Это более общая форма сгиба и сокращение. Аналогично семантики, но не требует, чтобы результат был супертипом тип элемента. Он перемещает элементы в разных разделах последовательно, используя seqop для обновления результата, а затем применяется combop к результатам из разных разделов. Реализация эта операция может работать на произвольном количестве сборов разделов, поэтому combop может вызываться произвольным числом раз.

Например, можно обрабатывать некоторые элементы, а затем производить множество. В этом случае seqop обработает элемент и добавит его в список, в то время как combop объединяет два списка из разных разделов. Начальное значение z будет пустым.

pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)

Другой пример: вычисление геометрического среднего из набора двойников (один обычно для этого требуются большие удваивания). B тип накопленного результаты z начальное значение для накопленного результата раздел - это, как правило, нейтральный элемент для seqop оператора (например, Nil для конкатенации списка или 0 для суммирования) и может оцениваться более одного раза подряд оператором, используемым для накопления результаты в группе разделяют ассоциативный оператор, используемый для объединить результаты из разных разделов

В вашем примере B есть Tuple2[Int, Int]. Метод seqop затем берет из списка один элемент из списка, в виде y и обновляет агрегат B до (x._1 + y, x._2 + 1). Таким образом, он увеличивает второй элемент в кортеже. Это эффективно помещает сумму элементов в первый элемент кортежа и количество элементов во второй элемент кортежа.

Метод combop затем берет результаты из каждого потока параллельного выполнения и объединяет их. Комбинация с помощью добавления дает те же результаты, что и в том случае, если они выполнялись в списке последовательно.

Использование B в качестве кортежа, вероятно, является путаной частью этого. Вы можете разбить проблему на две проблемы, чтобы лучше понять, что это делает. res0 - это первый элемент кортежа результата, а res1 - второй элемент в корте. результатов.

// Sums all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res0: Int = 21

// Counts all elements in parallel.    
scala> x.par.aggregate(0)((x, y) => x + 1, (x, y) => x + y)
res1: Int = 6

Ответ 2

Прежде всего, благодаря ответу Диего, который помог мне связать точки в понимании функции aggregate().

Позвольте мне признаться, что я не мог спать прошлой ночью должным образом, потому что я не мог понять, как aggregate() работает внутри, я сегодня хорошо поспаю: -)

Давайте начнем понимать его

val result = List(1,2,3,4,5,6,7,8,9,10).par.aggregate((0, 0))
         (
          (x, y) => (x._1 + y, x._2 + 1), 
          (x,y) =>(x._1 + y._1, x._2 + y._2)
         )

результат: (Int, Int) = (55,10)

агрегированная функция имеет 3 части:

  • начальное значение аккумуляторов: кортеж (0,0) здесь
  • seqop: он работает как foldLeft с начальным значением 0
  • combop: он объединяет результат, созданный посредством распараллеливания (эта часть мне трудно понять)

Познайте все 3 части независимо:

part-1: Начальный набор (0,0)

Агрегат() начинается с начального значения аккумуляторов x, которое здесь (0,0). Для вычисления суммы используется первый кортеж x._1, который изначально равен 0. Второй набор x._2 используется для вычисления общего количества элементов в списке.

part-2: (x, y) = > (x._1 + y, x._2 + 1)

Если вы знаете, как foldLeft работает в scala, тогда это должно быть легко понять эту часть. Выше функция работает так же, как foldLeft в нашем списке (1,2,3,4... 10).

Iteration#      (x._1 + y, x._2 + 1)
     1           (0+1, 0+1)
     2           (1+2, 1+1)
     3           (3+3, 2+1)
     4           (6+4, 3+1)
     .             ....
     .             ....
     10          (45+10, 9+1)

Таким образом, после всех 10 итераций вы получите результат (55,10). Если вы понимаете эту часть, остальное очень легко, но для меня это была самая трудная часть в понимании того, что все необходимые вычисления закончены, то что использует вторая часть, то есть compop - следите за обновлениями: -)

часть 3: (x, y) = > (x._1 + y._1, x._2 + y._2)

Ну, эта третья часть - combOp, которая объединяет результат, сгенерированный разными потоками во время распараллеливания, помните, что мы использовали 'par' в нашем коде, чтобы включить параллельное вычисление списка:

Список (1,2,3,4,5,6,7,8,9,10).par.aggregate(....)

Исход Apache эффективно использует агрегатную функцию для параллельного вычисления RDD.

Предположим, что наш список (1,2,3,4,5,6,7,8,9,10) вычисляется тремя потоками параллельно. Здесь каждый поток работает с неполным списком, а затем наш combate() combOp будет комбинировать результат вычисления каждого потока с помощью приведенного ниже кода:

(x,y) =>(x._1 + y._1, x._2 + y._2)

Исходный список: Список (1,2,3,4,5,6,7,8,9,10)

Thread1 начинает вычисление в частичном списке say (1,2,3,4), Thread2 вычисляет (5,6,7,8) и Thread3 вычисляет частичный список say (9,10)

В конце вычисления результат Thread-1 будет равен (10,4), результат Thread-2 будет (26,4), а результат Thread-3 будет (19,2).

В конце параллельного вычисления мы будем иметь ((10,4), (26,4), (19,2))

Iteration#      (x._1 + y._1, x._2 + y._2)
     1           (0+10, 0+4)
     2           (10+26, 4+4)
     3           (36+19, 8+2)

который равен (55,10).

Наконец, позвольте мне повторить итерацию этого задания seqOp, чтобы вычислить сумму всех элементов списка и общее количество списков, тогда как коммандовое задание должно сочетать разные частичные результаты, сгенерированные во время распараллеливания.

Надеюсь, что приведенное выше объяснение поможет вам понять агрегат().

Ответ 3

aggregate принимает 3 параметра: начальное значение, функцию вычисления и комбинационную функцию.

То, что он делает, в основном разбивает коллекцию на несколько потоков, вычисляет частичные результаты, используя функцию вычисления, а затем объединяет все эти частичные результаты с помощью комбинированной функции.

Из того, что я могу сказать, ваша примерная функция вернет пару (a, b), где a - сумма значений в списке, b - количество значений в списке. Действительно, (21, 6).

Как это работает? Начальное значение - пара (0,0). Для пустого списка мы имеем сумму 0 и число элементов 0, поэтому это правильно.

Ваша вычислительная функция принимает пару (Int, Int) x, которая является вашим частичным результатом, и Int y, которая является следующим значением в списке. Это ваш:

(x, y) => (x._1 + y, x._2 + 1)

В самом деле, результат, который мы хотим, состоит в том, чтобы увеличить левый элемент x (аккумулятора) на y и правый элемент x (счетчик) на 1 для каждого y.

Ваша комбинированная функция принимает пару (Int, Int) x и пару (Int, Int) y, которые являются вашими двумя частичными результатами из разных параллельных вычислений, и объединяет их как:

(x,y) => (x._1 + y._1, x._2 + y._2)

Действительно, мы суммируем независимо левые части пар и правые части пар.

Ваше замешательство возникает из-за того, что x и y в первой функции НЕ являются теми же x и y второй функции. В первой функции у вас есть x типа начального значения и y типа элементов коллекции, и вы возвращаете результат типа x. Во второй функции ваши два параметра имеют одинаковый тип исходного значения.

Надеюсь, теперь это станет яснее!