Подтвердить что ты не робот

Как создать правильный фрейм данных для классификации в Spark ML

Я пытаюсь запустить произвольную классификацию леса, используя Spark ML api, но у меня возникают проблемы с созданием правильного ввода данных в конвейер.

Вот пример данных:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

age и hours_per_week являются целыми числами, а другие функции, включая метку payRange, являются категориальными (String)

Загрузка этого файла csv (позволяет вызвать его sample.csv) может быть выполнена Spark csv library следующим образом:

val data = sqlContext.csvFile("/home/dusan/sample.csv")

По умолчанию все столбцы импортируются как строка, поэтому нам нужно изменить "age" и "hours_per_week" на Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

Просто, чтобы проверить, как выглядит схема:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

Затем задайте перекрестный валидатор и конвейер:

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

Ошибка при запуске этой строки:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException: Поле "функции" не существует.

В столбце RandomForestClassifier можно установить столбец столбцов и столбцов функций, однако у меня есть 4 столбца в качестве предикторов (функций) не только один.

Как я должен организовать свой фрейм данных, чтобы он правильно размещал столбцы ярлыков и функций?

Для вашего удобства здесь полный код:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}


object SampleClassification {

  def main(args: Array[String]): Unit = {

    //set spark context
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import com.databricks.spark.csv._

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv")

    //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
    val toInt    = udf[Int, String]( _.toInt)
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))


    val rf = new RandomForestClassifier()

    val pipeline = new Pipeline().setStages(Array(rf))

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

    // this fails with error
    //java.lang.IllegalArgumentException: Field "features" does not exist.
    val cmModel = cv.fit(dataFixed) 
  }

}

Спасибо за помощь!

4b9b3361

Ответ 1

Вам просто нужно убедиться, что в вашем фрейме данных есть столбец "features", который имеет тип VectorUDF, как показано ниже:

scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]

scala> val cmModel = cv.fit(df2) 
java.lang.IllegalArgumentException: requirement failed: Column features must be of type [email protected] but was actually IntegerType.
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
    at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
    at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)

EDIT1

По существу, в вашем фрейме данных должны присутствовать два параметра для вектора признаков и "метки", например, метки. Экземпляр должен иметь тип Double.

Чтобы создать поля "функции" с типом Vector, сначала создайте udf как показано ниже:

val toVec4    = udf[Vector, Int, Int, String, String] { (a,b,c,d) => 
  val e3 = c match {
    case "hs-grad" => 0
    case "bachelors" => 1
    case "masters" => 2
  }
  val e4 = d match {case "male" => 0 case "female" => 1}
  Vectors.dense(a, b, e3, e4) 
}

Теперь, чтобы закодировать поле "label", создайте еще один udf, как показано ниже:

val encodeLabel    = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )

Теперь мы преобразуем исходный фрейм данных, используя эти два udf:

val df = dataFixed.withColumn(
  "features",
  toVec4(
    dataFixed("age"),
    dataFixed("hours_per_week"),
    dataFixed("education"),
    dataFixed("sex")
  )
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")

Обратите внимание, что в кадре данных могут присутствовать дополнительные столбцы/поля, но в этом случае я выбрал только features и label:

scala> df.show()
+-------------------+-----+
|           features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]|  0.0|
|[28.0,40.0,1.0,1.0]|  0.0|
|[52.0,45.0,0.0,0.0]|  1.0|
|[31.0,50.0,2.0,1.0]|  1.0|
|[42.0,40.0,1.0,0.0]|  1.0|
+-------------------+-----+

Теперь вам нужно установить правильные параметры для вашего алгоритма обучения, чтобы он работал.

Ответ 2

Как и Spark 1.4, вы можете использовать Transformer org.apache.spark.ml.feature.VectorAssembler. Просто укажите имена столбцов, которые вы хотите использовать.

val assembler = new VectorAssembler()
  .setInputCols(Array("col1", "col2", "col3"))
  .setOutputCol("features")

и добавьте его в свой конвейер.

Ответ 3

В соответствии с инструкцией по искрообразованию на mllib - случайных деревьях мне кажется, что вы должны определить карту функций, которую вы используете, и точки должны быть помечены.

Это скажет алгоритму, какой столбец должен использоваться как предсказание, а какие - это функции.

https://spark.apache.org/docs/latest/mllib-decision-tree.html