Подтвердить что ты не робот

Как сохранить DataFrame непосредственно в Hive?

Можно ли сохранить DataFrame в иске непосредственно в Hive.

Я попытался преобразовать DataFrame в Rdd, а затем сохранить как текстовый файл, а затем загрузить в куст. Но мне интересно, могу ли я непосредственно сохранить DataFrame в улей

4b9b3361

Ответ 1

Вы можете создать временную таблицу в памяти и сохранить ее в таблице кустов с помощью sqlContext.

Допустим, ваш фрейм данных myDf. Вы можете создать одну временную таблицу, используя,

myDf.createOrReplaceTempView("mytempTable") 

Затем вы можете использовать простой оператор улья для создания таблицы и вывода данных из вашей временной таблицы.

sqlContext.sql("create table mytable as select * from mytempTable");

Ответ 3

Я не вижу, чтобы df.write.saveAsTable(...) устарела в документации Spark 2.0. Он работал для нас на Amazon EMR. Мы отлично умеем считывать данные с S3 в фреймворк данных, обрабатывать его, создавать таблицу из результата и читать ее с помощью MicroStrategy. Ответ на Vinays также работал.

Ответ 4

вам нужно создать/создать HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Затем непосредственно сохранить данные или выбрать столбцы для хранения в виде таблицы hive

df - это dataframe

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

или

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

или

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes: добавлять/игнорировать/перезаписывать/ErrorIfExists

Я добавил здесь определение для HiveContext из документации Spark,

В дополнение к базовому SQLContext вы также можете создать HiveContext, который обеспечивает надстройку функциональности, предоставляемой базовым SQLContext. Дополнительные функции включают возможность записи запросов с использованием более полного анализатора HiveQL, доступа к UUF Hive и возможности чтения данных из таблиц Hive. Чтобы использовать HiveContext, вам не нужно иметь существующую установку Hive, и все источники данных, доступные для SQLContext, по-прежнему доступны. HiveContext упаковывается отдельно, чтобы избежать включения всех зависимостей Hives в сборке по умолчанию Spark.


в Spark версии 1.6.2, используя "dbName.tableName", выдает эту ошибку:

org.apache.spark.sql.AnalysisException: указание имени базы данных или других классификаторов не разрешено для временных таблиц. Если имя таблицы имеет точки (.) В ней, укажите название таблицы с обратными окнами(). `

Ответ 6

Вот версия PySpark для создания таблицы Hive из файла паркета. Возможно, вы создали файлы Parquet, используя предполагаемую схему, и теперь хотите надавить определение на метастабильность улья. Вы также можете использовать определение системы, например, AWS Glue или AWS Athena, а не только для метастабильности Hive. Здесь я использую spark.sql для создания/создания постоянной таблицы.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);

Ответ 7

Для внешних таблиц Hive я использую эту функцию в PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)