Подтвердить что ты не робот

Использование Spark для записи файла паркета в s3 по s3a очень медленно

Я пытаюсь записать файл parquet на Amazon S3 с помощью Spark 1.6.1. Небольшой parquet, который я создаю, ~2GB, когда он написан, не так много данных. Я пытаюсь доказать Spark как платформу, которую я могу использовать.

В основном, я собираюсь настроить star schema на dataframes, тогда я собираюсь записать эти таблицы в паркет. Данные поступают из файлов csv, предоставляемых поставщиком, и я использую Spark как платформу ETL. В настоящее время у меня есть кластер 3 node в ec2(r3.2xlarge) So 120GB памяти исполнителей и всего 16 ядер.

Входные файлы составляют около 22 ГБ, и сейчас я извлекаю около 2 ГБ данных. В конце концов, это будет много терабайт, когда я начну загружать полный набор данных.

Вот моя искра / scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

Счет занимает около 2 минут для 465884512 строк. Запись в паркет занимает 38 минут

Я понимаю, что coalesce делает перетасовку с драйвером, который пишет. но количество времени, которое он принимает, заставляет меня думать, что я делаю что-то серьезно неправильно. Без coalesce это все равно занимает 15 минут, а IMO все еще слишком длинный и дает мне тонну небольших файлов parquet. Я хотел бы иметь один большой файл в день данных, которые у меня будут. У меня также есть код для разбиения на значение поля, и он так же медленный. Я также попытался вывести это на csv и занимает ~ 1 час.

Кроме того, я не устанавливаю реквизиты времени выполнения, когда я отправляю свою работу. Моя статистика консоли для одного задания:

  • Живые рабочие: 2
  • Используемые ядра: 16 Всего, 16 Используется
  • Используемая память: 117,5 ГБ Всего, 107,5 ГБ Используется
  • Приложения: 1 Запуск, 5 Завершено
  • Драйверы: 0 Запуск, 0 Завершено
  • Статус: ALIVE
4b9b3361

Ответ 1

Значения по умолчанию Spark приводят к большому количеству (возможно) ненужных накладных расходов во время операций ввода-вывода, особенно при записи в S3. В этой статье это обсуждается более подробно, но есть два параметра, которые вы бы хотели изменить.

  • Использование DirectParquetOutputCommitter. По умолчанию Spark сохраняет все данные во временную папку, а затем перемещает эти файлы. Использование DirectParquetOutputCommitter сэкономит время за счет прямой записи в выходной путь S3

    • Больше не доступен в Spark 2. 0+
      • Как указано в билете JIRA, текущее решение заключается в
        1. Переключите ваш код на использование s3a и Hadoop 2.7. 2+; все лучше, лучше в Hadoop 2.8 и является основой для s3guard
        2. Используйте Hadoop FileOutputCommitter и установите для mapreduce.fileoutputcommitter.algorithm.version значение 2

    -Schema объединение отключено по умолчанию с Spark 1.5 Отключитьобъединение схем. Если объединение схем включено, узел драйвера будет сканировать все файлы для обеспечения согласованности схемы. Это особенно дорого, потому что это не распределенная операция. Убедитесь, что это отключено, выполнив

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

Ответ 2

Прямой выходной коммиттер ушел из искробезопасной базы; вы должны написать свой собственный/воскресить удаленный код в своем JAR. ЕСЛИ вы это сделаете, отключите спекуляцию в своей работе и узнайте, что другие сбои также могут вызвать проблемы, где проблема - "недопустимые данные".

В яркой заметке Hadoop 2.8 собирается добавить некоторые ускорения S3A специально для чтения оптимизированных двоичных форматов (ORC, Parquet) с S3; Подробнее см. HADOOP-11694. И некоторые люди работают над использованием Amazon Dynamo для последовательного хранилища метаданных, который должен иметь возможность выполнять надежную O (1) фиксацию в конце работы.

Ответ 3

Одним из непосредственных подходов для ускорения записи Spark в S3 является использование оптимизатора EMRFS S3.

Однако если вы используете s3a, этот коммиттер не может быть использован:

Когда оптимизатор EMRFS S3 не используется

Исполнитель не используется при следующих обстоятельствах:

When writing to HDFS

-> When using the S3A file system

When using an output format other than Parquet, such as ORC or text

When using MapReduce or Spark RDD API

Я тестировал эту разницу на AWS EMR 5.26, и использование s3://было на 15% -30% быстрее, чем s3a://(но все еще медленно).

Самый быстрый способ, которым мне удалось выполнить такое копирование/запись, - это записать Parquet в локальную HDFS, а затем использовать s3distcp для копирования в S3; в одном конкретном сценарии (несколько сотен небольших файлов) это было в 5 раз быстрее, чем запись DataFrame в Parquet непосредственно на S3.

Ответ 4

У меня тоже была эта проблема. В дополнение к тому, что сказали остальные, вот полное объяснение от AWS: https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Во время моего эксперимента просто переход на FileOutCommiter v2 (с v1) улучшил запись в 3-4 раза.

self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")