Подтвердить что ты не робот

Есть ли способ добавить дополнительные метаданные для фреймов Spark?

Можно ли добавить дополнительные метаданные в DataFrame s?

Причина

У меня есть Spark DataFrame, для которого мне нужно сохранить дополнительную информацию. Пример: A DataFrame, для которого я хочу "запомнить" самый высокий используемый индекс в столбце id Integer.

Текущее решение

Я использую отдельный DataFrame для хранения этой информации. Конечно, сохранение этой информации по отдельности утомительно и подвержено ошибкам.

Есть ли лучшее решение для хранения такой дополнительной информации на DataFrame s?

4b9b3361

Ответ 1

Чтобы развернуть и Scala -fy nealmcb answer (вопрос был помечен scala, а не python, поэтому я не думаю, что этот ответ будет неактуальным или избыточным), предположим, что у вас есть DataFrame:

import org.apache.spark.sql
val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt")

И каким-то образом получить максимальное или любое другое, что вы хотите memoize в DataFrame:

val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max)

sql.types.Metadata может содержать только строки, булевы, некоторые типы чисел и другие структуры метаданных. Поэтому нам нужно использовать Long:

val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build()

DataFrame.withColumn() фактически имеет перегрузку, которая позволяет снабжать аргумент метаданных в конце, но необъяснимо отмечена [private], поэтому мы просто делаем то, что она делает - используйте Column.as(alias, metadata):

val newColumn = df.col("randInt").as("randInt_withMax", metadata)
val dfWithMax = df.withColumn("randInt_withMax", newColumn)

dfWithMax теперь имеет (столбец с) метаданные, которые вы хотите!

dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}"))
> randInt: metadata={}
> randInt_withMax: metadata={"columnMax":2094414111}

Или программно и безопасно (например, Metadata.getLong() и другие не возвращают Option и могут вызывать исключение "key not found" ):

dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax")
> res29: Long = 209341992

Прикрепление max к столбцу имеет смысл в вашем случае, но в общем случае с привязкой метаданных к DataFrame, а не к столбцу, в частности, вам нужно будет взять маршрут обертки, описанный другими ответами.

Ответ 2

Как и в Spark 1.2, схемы StructType имеют атрибут metadata, который может содержать произвольное отображение/словарь информации для каждого столбца в Dataframe. Например. (при использовании с отдельной библиотекой spark-csv):

customSchema = StructType([
  StructField("cat_id", IntegerType(), True,
    {'description': "Unique id, primary key"}),
  StructField("cat_title", StringType(), True,
    {'description': "Name of the category, with underscores"}) ])

categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv')
 .options(header='false')
 .load(csvFilename, schema = customSchema) )

f = categoryDumpDF.schema.fields
["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f]

["cat_id (IntegerType): {u'description': u'Unique id, primary key'}",
 "cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"]

Это было добавлено в [SPARK-3569] Добавить поле метаданных в StructField - ASF JIRA и предназначено для использования в конвейерах машинного обучения для отслеживания информации о сохраненных функциях в столбцах, таких как категориальные/непрерывные, категории чисел, карта категории к индексу. См. SPARK-3569: добавьте поле метаданных в конструкторский документ StructField.

Я бы хотел, чтобы это использовалось более широко, например. для описания и документирования столбцов, единицы измерения, используемой в столбце, информации о координатной оси и т.д.

Проблемы включают в себя то, как соответствующим образом сохранять или обрабатывать информацию метаданных при преобразовании столбца, как обрабатывать несколько видов метаданных, как сделать все возможное и т.д.

В интересах тех, кто думает о расширении этой функциональности в фреймах Spark, я ссылаюсь на некоторые аналогичные обсуждения вокруг Pandas.

Например, см. xray - довести помеченную мощность данных pandas до физических наук, которая поддерживает метаданные для помеченных массивов.

И посмотрите обсуждение метаданных для pandas на Разрешить прикрепление настраиваемых метаданных к панели /df/series? · Проблема № 2485 · pydata/pandas.

См. также раздел, посвященный единицам: ENH: единица измерения/физические величины · Проблема №10349 · pydata/ pandas

Ответ 3

Если вы хотите иметь менее утомительную работу, я думаю, вы можете добавить неявное преобразование между DataFrame и вашей пользовательской оболочкой (хотя еще не протестировали его).

   implicit class WrappedDataFrame(val df: DataFrame) {
        var metadata = scala.collection.mutable.Map[String, Long]()

        def addToMetaData(key: String, value: Long) {
           metadata += key -> value
        }
     ...[other methods you consider useful, getters, setters, whatever]...
      }

Если неявная оболочка находится в области DataFrame, вы можете просто использовать обычный DataFrame, как если бы это была ваша оболочка, то есть.:

df.addtoMetaData("size", 100)

Этот способ также изменяет ваши метаданные, поэтому вам не следует принудительно вычислять его только один раз и переносить его.

Ответ 4

Я бы сохранил обертку вокруг вашего фрейма. Например:

case class MyDFWrapper(dataFrame: DataFrame, metadata: Map[String, Long])
val maxIndex = df1.agg("index" ->"MAX").head.getLong(0)
MyDFWrapper(df1, Map("maxIndex" -> maxIndex))

Ответ 5

Многие люди видели слово "метаданные" и сразу переходили к "метаданным столбца". Это не то, что вы хотели, и не то, что я хотел, когда у меня была похожая проблема. В конечном счете, проблема здесь заключается в том, что DataFrame является неизменной структурой данных, которая, когда бы ни выполнялась операция над ней, передает данные, а остальная часть DataFrame - нет. Это означает, что вы не можете просто поместить оболочку на него, потому что, как только вы выполняете операцию, вы получаете совершенно новый DataFrame (потенциально совершенно нового типа, особенно с тенденциями Scala/Spark к неявным преобразованиям). Наконец, если DataFrame когда-либо избегает своей оболочки, нет способа восстановить метаданные из DataFrame.

У меня была эта проблема в Spark Streaming, которая фокусируется на RDD (а также на базовой структуре данных DataFrame) и пришла к одному простому выводу: единственное место для хранения метаданных - это имя RDD. Имя RDD никогда не используется базовой системой Spark, за исключением отчетов, поэтому его можно повторно использовать. Затем вы можете создать свою обертку на основе имени RDD с явным преобразованием между любым DataFrame и вашей оберткой с метаданными.

К сожалению, это все еще оставляет вас с проблемой неизменности и новых RDD, создаваемых с каждой операцией. Имя RDD (наше поле метаданных) теряется с каждым новым RDD. Это означает, что вам нужен способ повторно добавить имя к вашему новому СДР. Это можно решить, предоставив метод, который принимает функцию в качестве аргумента. Он может извлечь метаданные перед функцией, вызвать функцию и получить новый RDD/DataFrame, а затем назвать его метаданными:

def withMetadata(fn: (df: DataFrame) => DataFrame): MetaDataFrame = {
  val meta = df.rdd.name
  val result = fn(wrappedFrame)
  result.rdd.setName(meta)
  MetaDataFrame(result)
}

Ваш класс упаковки (MetaDataFrame) может предоставить удобные методы для анализа и установки значений метаданных, а также неявные преобразования между Spark DataFrame и MetaDataFrame. Пока вы выполняете все свои мутации с помощью метода withMetadata, ваши метаданные будут распространяться через весь конвейер преобразования. Да, использование этого метода для каждого вызова немного хлопотно, но простая реальность заключается в том, что в Spark нет концепции первоклассных метаданных.