Я пытаюсь записать файл parquet
на Amazon S3
с помощью Spark 1.6.1
. Небольшой parquet
, который я создаю, ~2GB
, когда он написан, не так много данных. Я пытаюсь доказать Spark
как платформу, которую я могу использовать.
В основном, я собираюсь настроить star schema
на dataframes
, тогда я собираюсь записать эти таблицы в паркет. Данные поступают из файлов csv, предоставляемых поставщиком, и я использую Spark как платформу ETL
. В настоящее время у меня есть кластер 3 node в ec2(r3.2xlarge)
So 120GB
памяти исполнителей и всего 16 ядер.
Входные файлы составляют около 22 ГБ, и сейчас я извлекаю около 2 ГБ данных. В конце концов, это будет много терабайт, когда я начну загружать полный набор данных.
Вот моя искра / scala pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
Счет занимает около 2 минут для 465884512 строк. Запись в паркет занимает 38 минут
Я понимаю, что coalesce
делает перетасовку с драйвером, который пишет. но количество времени, которое он принимает, заставляет меня думать, что я делаю что-то серьезно неправильно. Без coalesce
это все равно занимает 15 минут, а IMO все еще слишком длинный и дает мне тонну небольших файлов parquet
. Я хотел бы иметь один большой файл в день данных, которые у меня будут. У меня также есть код для разбиения на значение поля, и он так же медленный. Я также попытался вывести это на csv
и занимает ~ 1 час.
Кроме того, я не устанавливаю реквизиты времени выполнения, когда я отправляю свою работу. Моя статистика консоли для одного задания:
- Живые рабочие: 2
- Используемые ядра: 16 Всего, 16 Используется
- Используемая память: 117,5 ГБ Всего, 107,5 ГБ Используется
- Приложения: 1 Запуск, 5 Завершено
- Драйверы: 0 Запуск, 0 Завершено
- Статус: ALIVE