Использование Spark 1.4.0, Scala 2.10
Я пытался выяснить способ пересылки пустых значений вперед с помощью последнего известного наблюдения, но я не вижу простого способа. Я думаю, что это довольно распространенная вещь, но не могу найти пример, показывающий, как это сделать.
Я вижу функции для прямого заполнения значения NaN значением, или функции отставания/опережения для заполнения или смещения данных смещением, но ничего, чтобы подобрать последнее известное значение.
Заглядывая онлайн, я вижу много вопросов о том же в R, но не в Spark/Scala.
Я думал о сопоставлении диапазона дат, отфильтровывал NaN из результатов и выбирал последний элемент, но, думаю, я не совсем понимаю синтаксис.
Используя DataFrames я пытаюсь что-то вроде
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
но это никуда меня не приведет.
Часть фильтра не работает; функция map возвращает Sequence of spark.sql.Columns, но функция фильтра ожидает возврата логического значения, поэтому мне нужно получить значение из Column для тестирования, но, похоже, существуют только методы Column, которые возвращают Column.
Есть ли способ сделать это более просто на Spark?
Спасибо за ваш вклад
Редактировать:
Простой пример примера ввода:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Ожидаемый результат:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Примечание:
- У меня есть много столбцов, многие из которых имеют этот шаблон данных, но не в ту же дату/время. Если мне нужно, я буду делать преобразование по одному столбцу за раз.
РЕДАКТИРОВАТЬ:
После ответа @zero323 я попробовал так:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
широковещательная переменная заканчивается списком значений без нулей. Этот прогресс, но я все еще не могу заставить работать карту. но я ничего не получаю, потому что индекс i
в не отображается на исходные данные, он отображается на подмножество без нуля.
Что мне здесь не хватает?
РЕДАКТИРОВАТЬ и решение (как видно из @zero323 answer):
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
См. Ноль 323 ответа ниже для получения дополнительных опций, если вы используете RDD вместо DataFrames. Решение выше, возможно, не самое эффективное, но работает для меня. Если вы хотите оптимизировать, проверьте решение RDD.