Подтвердить что ты не робот

Вычисление средних значений для каждого KEY в Pairwise (K, V) RDD в Spark с Python

Я хочу поделиться этим конкретным Apache Spark с решением Python, потому что документация для него довольно плохая.

Я хотел рассчитать среднее значение пар K/V (хранящихся в Pairwise RDD), с помощью KEY. Вот как выглядят данные образца:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

Теперь следующая последовательность кода - это менее оптимальный способ, но она работает. Это то, что я делал, прежде чем я понял лучшее решение. Это не страшно, но, как вы увидите в разделе ответа, есть более сжатый, эффективный способ.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]
4b9b3361

Ответ 1

Теперь гораздо лучший способ сделать это - использовать метод rdd.aggregateByKey(). Поскольку этот метод настолько плохо документирован в Apache Spark с документацией Python, и именно поэтому я написал этот Q & A - до недавнего времени я использовал приведенную выше последовательность кода. Но опять же, он менее эффективен, поэтому избегать делать это таким образом, если это необходимо.

Здесь, как сделать то же самое с помощью метода rdd.aggregateByKey() (рекомендуется)...

В KEY одновременно вычислите SUM (числитель для среднего, который мы хотим вычислить), и COUNT (знаменатель для среднего, который мы хотим вычислить):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

Если верно следующее значение каждой пары a и b (так что вы можете визуализировать, что происходит):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Наконец, вычислить среднее значение для каждого КЛЮЧА и собрать результаты.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

Я надеюсь, что этот вопрос и ответ с помощью aggregateByKey() помогут. Если да, не забудьте также поднять вопрос. Спасибо. (◠﹏◠)

Ответ 2

На мой взгляд, более читаемый эквивалент aggregateByKey с двумя lambdas:

rdd1 = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

Таким образом, весь средний расчет будет следующим:

avg_by_key = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()

Ответ 3

Просто добавьте примечание об интуитивном и более коротком (но плохом) решении этой проблемы. Книга Сэм учит себя Apache Spark за 24 часа объяснил эту проблему хорошо в последней главе.

Используя groupByKey, можно легко решить проблему следующим образом:

rdd = sc.parallelize([
        (u'2013-10-09', 10),
        (u'2013-10-09', 10),
        (u'2013-10-09', 13),
        (u'2013-10-10', 40),
        (u'2013-10-10', 45),
        (u'2013-10-10', 50)
    ])

rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()

Вывод:

[('2013-10-10', 45.0), ('2013-10-09', 11.0)]

Это интуитивно понятный и привлекательный, но не использовать его! groupByKey не выполняет комбинирования на картографах и приносит все парные пары ключей в редуктор.

Избегайте groupByKey как можно больше. Идите с помощью решения reduceByKey, такого как @pat.

Ответ 4

Небольшое улучшение ответа призмалитики.

Может быть случай, когда вычисление суммы может переполнять число, потому что мы суммируем огромное количество значений. Вместо этого мы могли бы сохранить средние значения и продолжать вычислять среднее значение из среднего и уменьшаться количество двух частей.

Если у вас есть две части, имеющие среднее значение и считанные как (a1, c1) и (a2, c2), общее среднее значение: total/counts = (total1 + total2)/(count1 + counts2) = (a1 * c1 + a2 * c2)/(c1 + c2)

Если мы отметим R = c2/c1, его можно переписать далее как a1/(1 + R) + a2 * R/(1 + R) Если далее обозначить Ri как 1/(1 + R), мы можем записать его как a1 * Ri + a2 * R * Ri

myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
    R = 1.0*B[1]/A[1]
    Ri = 1.0/(1+R);
    av = A[0]*Ri + B[0]*R*Ri
    return (av, B[1] + A[1]);

(av, counts) = sumcount_rdd.reduce(avg)
print(av)

Этот подход может быть преобразован для значения ключа, просто используя mapValues ​​вместо map и reduceByKey вместо сокращения.

Это от: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2