Можно ли сохранить DataFrame
в иске непосредственно в Hive.
Я попытался преобразовать DataFrame
в Rdd
, а затем сохранить как текстовый файл, а затем загрузить в куст. Но мне интересно, могу ли я непосредственно сохранить DataFrame
в улей
Можно ли сохранить DataFrame
в иске непосредственно в Hive.
Я попытался преобразовать DataFrame
в Rdd
, а затем сохранить как текстовый файл, а затем загрузить в куст. Но мне интересно, могу ли я непосредственно сохранить DataFrame
в улей
Вы можете создать временную таблицу в памяти и сохранить ее в таблице кустов с помощью sqlContext.
Допустим, ваш фрейм данных myDf. Вы можете создать одну временную таблицу, используя,
myDf.createOrReplaceTempView("mytempTable")
Затем вы можете использовать простой оператор улья для создания таблицы и вывода данных из вашей временной таблицы.
sqlContext.sql("create table mytable as select * from mytempTable");
Используйте DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) См. руководство Spark SQL и DataFrame.
Я не вижу, чтобы df.write.saveAsTable(...)
устарела в документации Spark 2.0. Он работал для нас на Amazon EMR. Мы отлично умеем считывать данные с S3 в фреймворк данных, обрабатывать его, создавать таблицу из результата и читать ее с помощью MicroStrategy. Ответ на Vinays также работал.
вам нужно создать/создать HiveContext
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Затем непосредственно сохранить данные или выбрать столбцы для хранения в виде таблицы hive
df - это dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
или
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
или
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes: добавлять/игнорировать/перезаписывать/ErrorIfExists
Я добавил здесь определение для HiveContext из документации Spark,
В дополнение к базовому SQLContext вы также можете создать HiveContext, который обеспечивает надстройку функциональности, предоставляемой базовым SQLContext. Дополнительные функции включают возможность записи запросов с использованием более полного анализатора HiveQL, доступа к UUF Hive и возможности чтения данных из таблиц Hive. Чтобы использовать HiveContext, вам не нужно иметь существующую установку Hive, и все источники данных, доступные для SQLContext, по-прежнему доступны. HiveContext упаковывается отдельно, чтобы избежать включения всех зависимостей Hives в сборке по умолчанию Spark.
в Spark версии 1.6.2, используя "dbName.tableName", выдает эту ошибку:
org.apache.spark.sql.AnalysisException: указание имени базы данных или других классификаторов не разрешено для временных таблиц. Если имя таблицы имеет точки (.) В ней, укажите название таблицы с обратными окнами(). `
Сохранение в Hive - это вопрос использования метода write()
вашего SQLContext:
df.write.saveAsTable(tableName)
Из Spark 2.2: используйте DataSet вместо DataFrame.
Вот версия PySpark для создания таблицы Hive из файла паркета. Возможно, вы создали файлы Parquet, используя предполагаемую схему, и теперь хотите надавить определение на метастабильность улья. Вы также можете использовать определение системы, например, AWS Glue или AWS Athena, а не только для метастабильности Hive. Здесь я использую spark.sql для создания/создания постоянной таблицы.
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append('CREATE EXTERNAL TABLE test123 (')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
buf.append(total)
count = count + 1
buf.append(' )')
buf.append(' STORED as parquet ')
buf.append("LOCATION")
buf.append("'")
buf.append('s3://my-location/data/')
buf.append("'")
buf.append("'")
##partition by pt
tabledef = ''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
Для внешних таблиц Hive я использую эту функцию в PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," \
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
.replace("StringType", "STRING") \
.replace("IntegerType", "INT") \
.replace("DateType", "DATE") \
.replace("LongType", "INT") \
.replace("TimestampType", "INT") \
.replace("BooleanType", "BOOLEAN") \
.replace("FloatType", "FLOAT")\
.replace("DoubleType","FLOAT")
output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)