Я работаю с Spark 2.1.1 в наборе данных с функциями ~ 2000 и пытается создать базовый ML-трубопровод, состоящий из некоторых трансформаторов и классификаторов.
Предположим для простоты, что Pipeline, с которым я работаю, состоит из VectorAssembler, StringIndexer и классификатора, что было бы довольно распространенным методом использования.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
Если шаги трубопровода разделены на конвейер трансформатора (VectorAssembler + StringIndexer) и второй конвейер классификатора, и если ненужные столбцы отбрасываются между обоими конвейерами, обучение завершается успешно. Это означает, что для повторного использования моделей необходимо подготовить две модели PipelineModels после обучения, и необходимо ввести промежуточный этап предварительной обработки.
// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)
val mainPipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator()
.setEstimator(mainPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]
Решение (imho) гораздо более чистое, чтобы объединить все этапы трубопровода в один трубопровод.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assmbleFeatures, rf))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
// This will fail!
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]
Однако размещение всех PipelineStages в одном конвейере приводит к следующему исключению, возможно, из-за проблемы this PR в конечном итоге решит:
ERROR CodeGenerator: не удалось скомпилировать: org.codehaus.janino.JaninoRuntimeException: Постоянный пул для класса org.apache.spark.sql.catalyst.expressions.GeneratedClass $SpecificUnsafeProjection вырос за пределом JVM 0xFFFF
Причиной этого является то, что VectorAssembler эффективно удваивает (в этом примере) объем данных в DataFrame, так как нет трансформатора, который мог бы удалить ненужные столбцы. (См. ассемблер вектора искрового конвейера удаляет другие столбцы)
В примере приведен пример golub dataset и требуются следующие шаги предварительной обработки:
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)
// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)
// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))
// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")
Поскольку я новичок в Spark, я не уверен, что было бы лучшим способом решить эту проблему. Вы бы предложили...
-
создать новый трансформатор, который отбрасывает столбцы и может быть включен в конвейер?С > - разделите оба трубопровода и введите промежуточный шаг
- что-нибудь еще?:)
Или я не вижу ничего важного (шаги трубопровода, PR и т.д.), которые могли бы решить эту проблему?
Edit:
Я внедрил новый Transformer DroppingVectorAssembler
, который удаляет ненужные столбцы, однако вызывается одно и то же исключение.
Кроме того, установка spark.sql.codegen.wholeStage
на false
не решает проблему.