Подтвердить что ты не робот

Просмотр содержимого RDD в Python Spark?

Запуск простого приложения в pyspark.

f = sc.textFile("README.md")
wc = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)

Я хочу просмотреть содержимое RDD с помощью действия foreach:

wc.foreach(print)

Это вызывает синтаксическую ошибку:

SyntaxError: invalid syntax

Что мне не хватает?

4b9b3361

Ответ 1

Эта ошибка связана с тем, что print не является функцией в Python 2.6.

Вы можете определить вспомогательный UDF, который выполняет печать, или использовать библиотеку __future__ для обработки print как функции:

>>> from operator import add
>>> f = sc.textFile("README.md")
>>> def g(x):
...     print x
...
>>> wc.foreach(g)

или

>>> from __future__ import print_function
>>> wc.foreach(print)

Однако, я думаю, было бы лучше использовать collect(), чтобы вернуть содержимое RDD в драйвер, потому что foreach выполняется на рабочих узлах, и выходы могут не обязательно появляться в вашем драйвере/оболочке (возможно, это будет в режиме local, но не при работе в кластере).

>>> for x in wc.collect():
...     print x

Ответ 2

В Spark 2.0 (я не тестировал более ранние версии). Просто:

print myRDD.take(n)

Где n - количество строк, а myRDD - wc в вашем случае.

Ответ 3

Если вы хотите увидеть содержимое RDD, тогда yes collect - это один параметр, но он извлекает все данные в драйвер, поэтому может возникнуть проблема

<rdd.name>.take(<num of elements you want to fetch>)

Лучше, если вы хотите увидеть только образец

Запуск foreach и попытка печати, я не рекомендую это, потому что если вы используете это в кластере, тогда журналы печати будут локальными для исполнителя и будут печатать для данных, доступных этому исполнителю, Оператор print не меняет состояние, поэтому он не является логически неправильным. Чтобы получить все журналы, вам нужно будет сделать что-то вроде

**Pseudocode**
collect
foreach print

Но это может привести к поломке задания, так как сбор всех данных о драйвере может привести к его сбою. Я бы предложил использовать команду принять, или если вы хотите ее проанализировать, используйте образец для сбора на драйвере или записи в файл, а затем проанализируйте его.

Ответ 4

Попробуйте следующее:

data = f.flatMap(lambda x: x.split(' '))
map = data.map(lambda x: (x, 1))
mapreduce = map.reduceByKey(lambda x,y: x+y)
result = mapreduce.collect()

Обратите внимание, что при запуске collect() RDD - это распределенный набор данных, агрегируется в драйвере node и по существу преобразуется в список. Таким образом, очевидно, что собирать() набор данных 2T не рекомендуется. Если вам нужно всего несколько образцов из вашего RDD, используйте take (10).

Ответ 5

В последнем документе вы можете использовать rdd.collect(). foreach (println) в драйвере для отображения всех, но это может вызвать проблемы с памятью в драйвере, лучше всего использовать rdd.take(wish_number)

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

Чтобы напечатать все элементы в драйвере, можно использовать метод collect(), чтобы сначала привести RDD к драйверу node таким образом: rdd.collect(). foreach (println). Это может привести к тому, что драйвер исчерпает память, потому что collect() извлекает весь RDD на одну машину; если вам нужно всего лишь напечатать несколько элементов RDD, более безопасный подход - использовать метод take(): rdd.take(100).foreach(println).

Ответ 6

Вы можете просто собрать весь RDD (который вернет список) и распечатать указанный список:

print(wc.collect)