Подтвердить что ты не робот

Поддерживает ли Spark истинное сканирование столбцов по файлам паркета в S3?

Одним из больших преимуществ формата хранения данных Parquet является it columnar. Если у меня есть "широкий" набор данных с сотнями столбцов, но мой запрос затрагивает лишь некоторые из них, тогда можно прочитать только те данные, которые хранят эти несколько столбцов, и пропустить остальные.

Предположительно эта функция работает, читая немного метаданных во главе файла паркета, который указывает места в файловой системе для каждого столбца. Затем читатель может искать на диске для чтения только необходимые столбцы.

Кто-нибудь знает, правильно ли работает исправитель паркета по умолчанию, который правильно реализует этот выборочный поиск на S3? Я думаю, он поддерживается S3, но существует большая разница между теоретической поддержкой и реализацией, которая должным образом использует эту поддержку.

4b9b3361

Ответ 1

Это необходимо разбить

  • Получает ли код Паркета предикаты от искры (да)
  • Затем паркет пытается выборочно читать только эти столбцы, используя вызовы Hadoop FileSystem seek() + read() или readFully(position, buffer, length)? Да
  • Соединяет ли S3-порт эти операции с файлами в эффективные HTTP-запросы GET? В Amazon EMR: Да. В Apache Hadoop вам нужно hasoop 2.8 на пути к классам и правильно установить spark.hadoop.fs.s3a.experimental.fadvise=random для запуска произвольного доступа.

Hadoop 2.7 и ранее обрабатывают агрессивный поиск() вокруг файла плохо, потому что они всегда инициируют смещение-конец файла GET, удивляются следующему поиску, должны прервать это соединение, повторно открыть новый TCP/Соединение HTTPS 1.1 (медленное, CPU тяжелое), повторите это снова, повторно. Случайная операция ввода-вывода повреждает объемную загрузку таких вещей, как .csv.gz, но имеет решающее значение для получения ORC/Parquet perf.

Вы не получите ускорение на Hadoop 2.7 hadoop-aws JAR. Если вам это нужно, вам нужно обновить hasoop *.jar и зависимости, или создать Spark с нуля против Hadoop 2.8

Обратите внимание, что Hadoop 2.8+ также имеет приятную небольшую функцию: если вы вызываете toString() на клиентской файловой системе S3A в операторе журнала, он распечатывает всю статистику IO файловой системы, включая, сколько данных было отброшено в поисках, прервано Соединения TCP & c. Помогает вам понять, что происходит.

Ответ 2

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: У меня нет окончательного ответа и я не хочу также выступать в качестве авторитетного источника, но потратил некоторое время на поддержку паркета в Spark 2.2+, и я надеюсь, что мой ответ поможет нам всем получить ближе к правильному ответу.


Не удается ли Parquet на S3 вытащить данные из неиспользуемых столбцов из S3 и получить только нужные ему фрагменты файлов или вытащить весь файл?

Я использую Spark 2.3.0-SNAPSHOT, который я создал сегодня прямо из мастера.

parquet формат источника данных обрабатывается ParquetFileFormat, который является FileFormat.

Если я прав, читающая часть обрабатывается методом buildReaderWithPartitionValues ​​ (который переопределяет FileFormat).

buildReaderWithPartitionValues используется исключительно при запросе физического оператора FileSourceScanExec для так называемых входных RDD, которые на самом деле являются единственным RDD для генерации внутренних строк при выполнении WholeStageCodegenExec.

С учетом сказанного, я думаю, что рассмотрение того, что buildReaderWithPartitionValues, может приблизиться к окончательному ответу.

Когда вы посмотрите на строку, вы можете быть уверены, что мы на правильном пути.

//Попробуйте нажать фильтры, когда включен режим фильтрации.

Этот путь кода зависит от свойства spark.sql.parquet.filterPushdown Spark, что включен по умолчанию.

spark.sql.parquet.filterPushdown. Включает оптимизацию push-down фильтра Паркета при значении true.

Это приводит нас к parquet-hadoop ParquetInputFormat.setFilterPredicate, если фильтры определены.

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}

Код становится интереснее немного позже, когда фильтры используются, когда код возвращается к паркет-мр (вместо использования так называемого векторизованного считывателя декодирования паркета). Это часть, которую я действительно не понимаю (кроме того, что я вижу в коде).

Обратите внимание, что для векторизованного считывателя декодирования паркета управляется свойство spark.sql.parquet.enableVectorizedReader Spark, которое включено по умолчанию.

СОВЕТ. Чтобы узнать, какая часть выражения if используется, включите DEBUG уровень ведения журнала для org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat регистратора.

Чтобы увидеть все вытолкнутые фильтры, вы можете включить INFO уровень ведения журнала org.apache.spark.sql.execution.FileSourceScanExec logger. Вы должны видеть в журналах следующее:

INFO Pushed Filters: [pushedDownFilters]

Надеюсь, что если это не будет окончательным ответом, это немного помогло, и кто-то подхватил его, где я остановился, чтобы сделать это в ближайшее время. Надежда умирает последней:)

Ответ 3

Нет, предикат pushdown не поддерживается полностью. Это, конечно, зависит от:

  • Конкретный вариант использования
  • Искра версия
  • Тип и версия разъема S3

Чтобы проверить конкретный вариант использования, вы можете включить уровень журнала DEBUG в Spark и запустить запрос. Затем вы можете увидеть, есть ли "запросы" во время запросов S3 (HTTP), а также сколько запросов было отправлено. Что-то вроде этого:

17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"

Вот пример отчета о проблеме, который был недавно открыт из-за невозможности Spark 2.1 рассчитать COUNT(*) всех строк в наборе данных на основе метаданных, хранящихся в файле Parquet: https://issues.apache.org/jira/browse/SPARK-21074

Ответ 4

паркет-считыватель искры точно так же, как и любой другой InputFormat,

  • Ни один из inputFormat не имеет специальной функции для S3. Форматы ввода могут считывать из LocalFileSystem, Hdfs и S3 для этой цели специальная оптимизация.

  • Parquet InpuTFormat в зависимости от запрашиваемых столбцов выборочно читает столбцы для вас.

  • Если вы хотите быть уверенным (хотя предикаты push down работают в последней искровой версии), вручную выберите столбцы и напишите преобразование и действия, а не в зависимости от SQL