Подтвердить что ты не робот

Искра: запись DataFrame как сжатого JSON

Apache Spark DataFrameReader.json() может обрабатывать файлы gzipped JSONlines автоматически, но, похоже, нет способа получить DataFrameWriter.json() для записи сжатых файлов JSONlines. Дополнительный сетевой ввод-вывод очень дорог в облаке.

Есть ли способ решить эту проблему?

4b9b3361

Ответ 1

В следующих решениях используется pyspark, но я предполагаю, что код в Scala будет похож.

Первый вариант - установить следующее, когда вы инициализируете SparkConf:

conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

С кодом выше любого файла, который вы производите, используя этот sparkContext, автоматически сжимается с помощью gzip.

Второй вариант, если вы хотите сжать только выбранные файлы в вашем контексте. Допустим, что "df" - это ваш фрейм данных и имя файла для вашего назначения:

df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

Ответ 2

С Spark 2.X(и, возможно, раньше я не тестировал) существует более простой способ записи сжатого JSON, который не требует изменения конфигурации:

val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")

Это также работает для CSV и для Parquet, просто используйте .csv() и .parquet() вместо .json(), чтобы записать файл после установки опции сжатия.

Возможные кодеки: none, bzip2, deflate, gzip, lz4 и snappy.

Ответ 3

Установка параметров сжатия в SparkConf НЕ является хорошей практикой, как принятый ответ. Это изменило поведение глобально вместо того, чтобы указывать настройки для каждого файла отдельно. Правда в том, что явное всегда лучше, чем неявное. Есть также некоторые случаи, когда пользователи не могут легко манипулировать конфигурацией контекста, например spark-shell или в кодах, разработанных как подмодуль другого.

Правильный путь

Написание DataFrame со сжатием поддерживается начиная с Spark 1.4. Несколько способов достичь этого:

Один

df.write.json("filename.json", compression="gzip")

Это! Просто используйте DataFrameWriter.json() как вы хотите.

Волшебство скрыто в коде pyspark/sql/readwriter.py

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
    """Saves the content of the :class:'DataFrame' in JSON format
    ('JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>'_) at the
    specified path.

    :param path: the path in any Hadoop supported file system
    :param mode: ...

    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, bzip2, gzip, lz4,
                        snappy and deflate).
    :param dateFormat: ...
    :param timestampFormat: ...

    >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
    """
    self.mode(mode)
    self._set_opts(
        compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
    self._jwrite.json(path)

Поддерживаемые форматы сжатия: bzip2, gzip, lz4, snappy и deflate, без учета регистра.

Scala API должен быть таким же.

Другая

df.write.options(compression="gzip").json("filename.json")

Подобно тому, как указано выше. в качестве аргументов ключевых слов можно указать больше параметров. доступно начиная с Spark 1.4.

В третьих

df.write.option("compression", "gzip").json("filename.json")

DataFrameWriter.option() добавлен начиная с DataFrameWriter.option() Spark 1.5. Только один параметр может быть добавлен за один раз.

Ответ 4

Очень хороший ответ @ttimasdf. Однако опция # 1 не работает при развертывании HortonWorks в версии 1.6.3. Варианты № 2 и № 3 работают хорошо.