Подтвердить что ты не робот

Оптимальный способ создания трубопровода ml в Apache Spark для набора данных с большим количеством столбцов

Я работаю с Spark 2.1.1 в наборе данных с функциями ~ 2000 и пытается создать базовый ML-трубопровод, состоящий из некоторых трансформаторов и классификаторов.

Предположим для простоты, что Pipeline, с которым я работаю, состоит из VectorAssembler, StringIndexer и классификатора, что было бы довольно распространенным методом использования.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Если шаги трубопровода разделены на конвейер трансформатора (VectorAssembler + StringIndexer) и второй конвейер классификатора, и если ненужные столбцы отбрасываются между обоими конвейерами, обучение завершается успешно. Это означает, что для повторного использования моделей необходимо подготовить две модели PipelineModels после обучения, и необходимо ввести промежуточный этап предварительной обработки.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

Решение (imho) гораздо более чистое, чтобы объединить все этапы трубопровода в один трубопровод.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Однако размещение всех PipelineStages в одном конвейере приводит к следующему исключению, возможно, из-за проблемы this PR в конечном итоге решит:

ERROR CodeGenerator: не удалось скомпилировать: org.codehaus.janino.JaninoRuntimeException: Постоянный пул для класса org.apache.spark.sql.catalyst.expressions.GeneratedClass $SpecificUnsafeProjection вырос за пределом JVM 0xFFFF

Причиной этого является то, что VectorAssembler эффективно удваивает (в этом примере) объем данных в DataFrame, так как нет трансформатора, который мог бы удалить ненужные столбцы. (См. ассемблер вектора искрового конвейера удаляет другие столбцы)

В примере приведен пример golub dataset и требуются следующие шаги предварительной обработки:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Поскольку я новичок в Spark, я не уверен, что было бы лучшим способом решить эту проблему. Вы бы предложили...

  • создать новый трансформатор, который отбрасывает столбцы и может быть включен в конвейер?С >
  • разделите оба трубопровода и введите промежуточный шаг
  • что-нибудь еще?:)

Или я не вижу ничего важного (шаги трубопровода, PR и т.д.), которые могли бы решить эту проблему?


Edit:

Я внедрил новый Transformer DroppingVectorAssembler, который удаляет ненужные столбцы, однако вызывается одно и то же исключение.

Кроме того, установка spark.sql.codegen.wholeStage на false не решает проблему.

4b9b3361

Ответ 1

Ошибка janino объясняется количеством постоянных переменных, созданных в процессе оптимизации. Максимальный предел постоянных переменных, допускаемых в JVM, равен ((2 ^ 16) -1). Если этот предел превышен, вы получите Constant pool for class ... has grown past JVM limit of 0xFFFF

JIRA, которая исправит эту проблему, будет SPARK-18016, но она все еще выполняется в настоящее время.

Ваш код, скорее всего, не работает на этапе VectorAssembler, когда он должен выполнять работу с тысячами столбцов во время одной задачи оптимизации.

Обходной путь, который я разработал для этой проблемы, - это создать "вектор векторов", работая против подмножеств столбцов, а затем объединяя результаты в конце для создания уникального вектор-функции. Это предотвращает превышение одной цели оптимизации одной переменной константы JVM. Это не изящно, но я использовал его на наборах данных, попадающих в диапазон 10 тыс. Столбцов.

Этот метод также позволяет вам сохранять один конвейер, хотя для его работы требуются дополнительные шаги (создание подвекторов). После того, как вы создали вектор объектов из подвекторов, вы можете по желанию удалить исходные столбцы источника.

Пример кода:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(Примечание. Метод создания списков столбцов должен выполняться программно, но я придерживался этого простого примера для понимания концепции.)

Ответ 2

Ошибка janino, которую вы получаете, связана с тем, что в зависимости от набора функций сгенерированный код становится больше.

Я бы отделил шаги в разных конвейерах и сбросил ненужные функции, сохранил промежуточные модели, такие как StringIndexer и OneHotEncoder, и загрузил их во время этапа прогнозирования, что также полезно, потому что преобразования будут быстрее для данных, которые должен быть предсказан.

Наконец, вам не нужно сохранять столбцы функций после запуска этапа VectorAssembler, поскольку он преобразует функции в столбцы feature vector и label, и это все, что вам нужно для запуска прогнозов.

Пример трубопровода в Scala с сохранением промежуточных этапов - (более старый API-интерфейс)

Кроме того, если вы используете более старую версию искры, например, 1.6.0, вам нужно проверить исправленную версию, например, 2.1.1 или 2.2.0 или 1.6.4, иначе вы столкнулись бы с ошибкой janino даже при использовании 400.