Подтвердить что ты не робот

Объясните совокупную функциональность в Spark

Я ищу более подробное объяснение совокупной функциональности, доступной через искру в python.

Пример, который у меня есть, следующий (используя pyspark из версии Spark 1.2.0)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Вывод:

(10, 4)

Я получаю ожидаемый результат (10,4), который представляет собой сумму 1+2+3+4 и 4 элемента. Если я изменил начальное значение, переданное агрегированной функции на (1,0) из (0,0), я получаю следующий результат

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Вывод:

(19, 4)

Значение увеличивается на 9. Если я изменю его на (2,0), значение будет (28,4) и т.д.

Может кто-нибудь объяснить мне, как рассчитывается это значение? Я ожидал, что значение увеличится на 1, а не на 9, ожидая увидеть (11,4) вместо этого, я вижу (19,4).

4b9b3361

Ответ 1

У меня недостаточно очков репутации, чтобы прокомментировать предыдущий ответ Маасга. На самом деле нулевое значение должно быть "нейтральным" по отношению к seqop, что означает, что оно не будет мешать результату seqop, например, 0 к сложению или 1 к *;

Никогда не пытайтесь использовать ненейтральные значения, так как они могут применяться произвольное время. Это поведение связано не только с числом разделов.

Я попробовал тот же эксперимент, как указано в вопросе. с 1 разделом нулевое значение применялось 3 раза. с 2 перегородками, 6 раз. с 3 разделами, 9 раз, и это будет продолжаться.

Ответ 2

Я не был полностью убежден в принятом ответе, и ответ JohnKnight помог, поэтому здесь моя точка зрения:

Прежде всего, позвольте объяснить aggregate() моими собственными словами:

Прототип:

aggregate(zeroValue, seqOp, combOp)

Описание:

aggregate() позволяет вам взять СДР и сгенерировать одно значение, отличающееся от того, которое было сохранено в исходном СДР.

Параметры:

  1. zeroValue: значение инициализации, для вашего результата, в желаемом формат.
  2. seqOp: операция, которую вы хотите применить к записям RDD. Работает один раз для каждая запись в разделе.
  3. combOp: определяет, как получаются объекты (по одному на каждый раздел), объединяется.

Пример:

Вычислить сумму списка и длину этого списка. Вернуть результат в паре (sum, length).

В оболочке Spark я сначала создал список из 4 элементов с 2 разделами:

listRDD = sc.parallelize([1,2,3,4], 2)

Затем я определил свой seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

и мой combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

а потом я агрегировал:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

Как видите, я дал описательные имена своим переменным, но позвольте мне объяснить это подробнее:

Первый раздел имеет подсписок [1, 2]. Мы будем применять seqOp к каждому элементу этого списка, и это приведет к локальному результату, паре (sum, length), который будет отражать результат локально, только в этом первом разделе.

Итак, начнем: local_result инициализируется параметром zeroValue, которому мы предоставили aggregate(), т.е. (0, 0) и list_element - первый элемент списка, т.е. 1. В результате это что происходит:

0 + 1 = 1
0 + 1 = 1

Теперь локальный результат равен (1, 1), это означает, что до сих пор для 1-го раздела после обработки только первого элемента сумма равна 1 и длине 1. Обратите внимание, что local_result получает обновлено с (0, 0) до (1, 1).

1 + 2 = 3
1 + 1 = 2

и теперь локальный результат - (3, 2), который будет окончательным результатом 1-го раздела, поскольку они не являются другими элементами в подсписке 1-го раздела.

Проделав то же самое для второго раздела, мы получим (7, 2).

Теперь мы применяем combOp к каждому локальному результату, чтобы мы могли сформировать окончательный глобальный результат, например так: (3,2) + (7,2) = (10, 4)


Пример описан в 'figure':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Вдохновлен этим великолепным примером.


Так что теперь, если zeroValue не (0, 0), а (1, 0), можно ожидать, что (8 + 4, 2 + 2) = (12, 4), что не объясняет, что вы испытываете, Даже если мы изменим количество разделов в моем примере, я не смогу получить это снова.

Ключевым моментом здесь является ответ JohnKnight, в котором говорится, что zeroValue не только аналогичен количеству разделов, но может применяться больше раз, чем вы ожидаете.

Ответ 3

Агрегат позволяет преобразовывать и комбинировать значения RDD по желанию.

Он использует две функции:

Первый преобразует и добавляет элементы исходной коллекции [T] в локальную совокупность [U] и принимает вид: (U, T) = > U. Вы можете видеть это как складку, и поэтому она также для этой операции требуется ноль. Эта операция применяется локально к каждому разделу параллельно.

Вот где ключ заключается в следующем: единственное значение, которое следует использовать здесь, - это значение ZERO для операции сокращения. Эта операция выполняется локально на каждом разделе, поэтому добавление чего-либо к этому нулевому значению добавит к результату, умноженному на количество разделов RDD.

Вторая операция принимает 2 значения типа результата предыдущей операции [U] и объединяет ее в одно значение. Эта операция уменьшит частичные результаты каждого раздела и приведет к фактической сумме.

Например: Учитывая RDD строк:

val rdd:RDD[String] = ???

Предположим, вы хотите создать совокупность длины строк в этом RDD, чтобы вы сделали:

1) Первая операция преобразует строки в размер (int) и накапливает значения для размера.

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2) обеспечивают ZERO для операции добавления (0)

val ZERO = 0

3) операцию для добавления двух целых чисел:

val add: (Int, Int) => Int = _ + _

Объединяя все это:

rdd.aggregate(ZERO, stringSizeCummulator, add)

Итак, зачем нужен ZERO? Когда функция куммулятора применяется к первому элементу раздела, нет текущей суммы. Здесь используется ZERO.

Eg. Мой RDD: - Раздел 1: [ "Перейти", "над" ] - Раздел 2: [ "the", "wall" ]

Это приведет к:

Р1:

  • stringSizeCummulator (ZERO, "Jump" ) = 4
  • stringSizeCummulator (4, "over" ) = 8

P2:

  • stringSizeCummulator (ZERO, "the" ) = 3
  • stringSizeCummulator (3, "wall" ) = 7

Уменьшить: добавить (P1, P2) = 15

Ответ 4

Вы можете использовать следующий код (в scala), чтобы точно увидеть, что делает aggregate. Он создает дерево всех операций сложения и слияния:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

И затем в оболочке:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

Итак, мы имеем эти 3 раздела: [4], [1,2] и [3].

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

Вы можете представить результат как дерево:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

Вы можете видеть, что первый нулевой элемент создается в драйвере node (слева от дерева), а затем результаты для всех разделов объединяются один за другим. Вы также видите, что если вы замените 0 на 1, как и в своем вопросе, он добавит по 1 к каждому результату в каждом разделе, а также добавит 1 к исходному значению в драйвере. Таким образом, общее количество времени, которое вы даете нулю, используется:

number of partitions + 1.

Итак, в вашем случае результат

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

будет:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

Реализация aggregate довольно проста. Он определен в RDD.scala, строка 1107:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}

Ответ 5

Великие объяснения, это действительно помогло мне понять нижнюю работу агрегатной функции. Я играл с ним некоторое время и узнал, как показано ниже.

  • если вы используете acc as (0,0), то он не изменит результат выхода из функции.

  • если исходный аккумулятор изменен, тогда он обработает результат, как показано ниже

[сумма элементов RDD + начальное значение * Количество разделов RDD + acc начальное значение]

для вопроса здесь, я бы предложил проверить разделы, так как количество разделов должно быть 8 согласно моему пониманию, так как каждый раз, когда мы обрабатываем seq op в разделе RDD, он начинается с начальной суммы результата acc а также когда он будет делать гребень Op, он снова будет использовать начальное значение acc.

например.  Список (1,2,3,4) и acc (1,0)

Получить разделы в scala с помощью RDD.partitions.size

если разделы являются 2, а число элементов равно 4, то = > [10 + 1 * 2 + 1] = > (13,4)

если раздел равен 4, а число элементов равно 4, тогда = > [10 + 1 * 4 + 1] = > (15,4)

Надеюсь, что это поможет, вы можете проверить здесь для объяснения. Спасибо.

Ответ 6

Для людей, которые ищут Scala Эквивалентный код для приведенного выше примера - вот оно. Та же логика, тот же ввод/результат.

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)

Ответ 7

Я пробую много экспериментов по этому вопросу. Лучше установить множество разделов для агрегата. seqOp обработает каждый партион и применит начальное значение, что еще больше, combOp также применит начальное значение при объединении всех разделов. Итак, я представляю формат для этого вопроса:

final result = sum(list) + num_Of_Partitions * initial_Value + 1

Ответ 8

Благодаря гсамарас.

Мой обзор, как показано ниже, enter image description here

Ответ 9

Я объясню концепцию Агрегатной операции в Spark следующим образом:

Определение агрегатной функции

**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4) → 4 представляет количество разделов, доступных в нашем кластере Spark.

Следовательно, rdd распределяется на 4 раздела как:

11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16

мы делим постановку задачи на две части: Первая часть проблемы заключается в агрегировании общего количества цветов, собранных в каждом квадранте; что агрегация последовательности внутри раздела

11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65

Вторая часть проблемы заключается в суммировании этих отдельных агрегатов по разделам; что агрегация между разделами.

36 + 75 + 108 + 65 = 284

Сумма, хранящаяся в СДР, может в дальнейшем использоваться и обрабатываться для любого вида преобразования или другого действия

Таким образом, код становится как:

val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y)) или val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284

Пояснение: (0) - это аккумулятор Первый + - сумма внутри раздела, добавляющая общее количество цветов, собранных каждым сборщиком в каждом квадранте сада. Второй + - это сумма между разбиениями, которая объединяет итоговые суммы по каждому квадранту.

Случай 1:

Предположим, если нам нужно уменьшить функции после начального значения. Что произойдет, если начальное значение не будет равно нулю? Если бы это было 4, например:

Число будет добавлено к каждому внутрираздельному агрегату, а также к межраздельному агрегату:

Итак, первый расчет будет следующим:

11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70

Здесь вычисление агрегации между разделами с начальным значением 5:

partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309

Итак, перейдем к вашему запросу: Сумма может быть рассчитана на основе количества разделов, на которые распределяются данные rdd. Я думал, что ваши данные распространяются, как показано ниже, и поэтому у вас есть результат как (19, 4). Таким образом, при выполнении агрегатной операции необходимо указывать номер значения раздела:

val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

результат:

list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)

Объяснение: Поскольку ваши данные распределены по 8 разделам, результат будет похож (с использованием вышеописанной логики)

добавление внутри раздела:

0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5

total=18

расчет между разделами:

18+1 (1+2+1+3+1+4+1+5+1) = 19

спасибо