Скажем, у меня есть DataFrame (который я читал из csv на HDFS), и я хочу обучать ему некоторые алгоритмы через MLlib. Как преобразовать строки в LabeledPoints или иным образом использовать MLlib в этом наборе данных?
Использование DataFrame с MLlib
Ответ 1
Предполагая, что вы используете Scala:
Скажем, получим DataFrame
следующим образом:
val results : DataFrame = sqlContext.sql(...)
Шаг 1: вызов results.printSchema()
- это покажет вам не только столбцы в DataFrame и (это важно) их порядок, но и то, что Spark SQL считает их типами. Как только вы видите этот выход, все становится намного менее загадочным.
Шаг 2. Получите RDD[Row]
из DataFrame
:
val rows: RDD[Row] = results.rdd
Шаг 3. Теперь это просто вопрос вытягивания любых полей, которые вас интересуют из отдельных строк. Для этого вам нужно знать позицию на основе 0 каждого поля и ее тип, и, к счастью, вы получили все, что было на шаге 1 выше. Например,
скажем, вы сделали SELECT x, y, z, w FROM ...
и распечатали схему, полученную
root
|-- x double (nullable = ...)
|-- y string (nullable = ...)
|-- z integer (nullable = ...)
|-- w binary (nullable = ...)
И скажем все, что вы хотели использовать x
и z
. Вы можете вывести их в RDD[(Double, Integer)]
следующим образом:
rows.map(row => {
// x has position 0 and type double
// z has position 2 and type integer
(row.getDouble(0), row.getInt(2))
})
Здесь вы просто используете Core Spark для создания соответствующих объектов MLlib. Все может немного усложниться, если ваш SQL возвращает столбцы типа массива, и в этом случае вам придется вызывать getList(...)
для этого столбца.
Ответ 2
Предполагая, что вы используете JAVA (Искра версия 1.6.2):
Вот простой пример кода JAVA с использованием DataFrame для машинного обучения.
-
Он загружает JSON со следующей структурой,
[{ "label": 1, "att2": 5.037089672359123, "att1": 2.4100883023159456},...]
-
разделяет данные на обучение и тестирование,
- обучите модель, используя данные поезда,
- применить модель к тестовым данным и
- сохраняет результаты.
Кроме того, согласно официальной документации "API-интерфейс на основе DataFrame является основным API" для MLlib с текущей версии 2.0.0. Таким образом, вы можете найти несколько примеров, используя DataFrame.
Код:
SparkConf conf = new SparkConf().setAppName("MyApp").setMaster("local[2]");
SparkContext sc = new SparkContext(conf);
String path = "F:\\SparkApp\\test.json";
String outputPath = "F:\\SparkApp\\justTest";
System.setProperty("hadoop.home.dir", "C:\\winutils\\");
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().json(path);
df.registerTempTable("tmp");
DataFrame newDF = df.sqlContext().sql("SELECT att1, att2, label FROM tmp");
DataFrame dataFixed = newDF.withColumn("label", newDF.col("label").cast("Double"));
VectorAssembler assembler = new VectorAssembler().setInputCols(new String[]{"att1", "att2"}).setOutputCol("features");
StringIndexer indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndexed");
// Split the data into training and test
DataFrame[] splits = dataFixed.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];
DecisionTreeClassifier dt = new DecisionTreeClassifier().setLabelCol("labelIndexed").setFeaturesCol("features");
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {assembler, indexer, dt});
// Train model
PipelineModel model = pipeline.fit(trainingData);
// Make predictions
DataFrame predictions = model.transform(testData);
predictions.rdd().coalesce(1,true,null).saveAsTextFile("justPlay.txt" +System.currentTimeMillis());