У меня очень большой набор данных, который загружается в Hive. Он состоит из примерно 1,9 миллиона строк и 1450 столбцов. Мне нужно определить "охват" каждого из столбцов, то есть долю строк, которые имеют значения, отличные от NaN для каждого столбца.
Вот мой код:
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs=sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Попробуйте это в оболочке pyspark, если я затем сделаю covgs.take(10), он вернет довольно большой стек ошибок. В нем говорится, что есть проблема с сохранением файла /usr/lib64/python2.6/pickle.py
. Это заключительная часть ошибки:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Если есть лучший способ сделать это, как я пытаюсь, я открыт для предложений. Я не могу использовать pandas, хотя, поскольку он не доступен в настоящее время в кластере, над которым я работаю, и у меня нет прав на его установку.