Подтвердить что ты не робот

Spark/Scala: форвардная заливка с последним наблюдением

Использование Spark 1.4.0, Scala 2.10

Я пытался выяснить способ пересылки пустых значений вперед с помощью последнего известного наблюдения, но я не вижу простого способа. Я думаю, что это довольно распространенная вещь, но не могу найти пример, показывающий, как это сделать.

Я вижу функции для прямого заполнения значения NaN значением, или функции отставания/опережения для заполнения или смещения данных смещением, но ничего, чтобы подобрать последнее известное значение.

Заглядывая онлайн, я вижу много вопросов о том же в R, но не в Spark/Scala.

Я думал о сопоставлении диапазона дат, отфильтровывал NaN из результатов и выбирал последний элемент, но, думаю, я не совсем понимаю синтаксис.

Используя DataFrames я пытаюсь что-то вроде

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

но это никуда меня не приведет.

Часть фильтра не работает; функция map возвращает Sequence of spark.sql.Columns, но функция фильтра ожидает возврата логического значения, поэтому мне нужно получить значение из Column для тестирования, но, похоже, существуют только методы Column, которые возвращают Column.

Есть ли способ сделать это более просто на Spark?

Спасибо за ваш вклад

Редактировать:

Простой пример примера ввода:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

Ожидаемый результат:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

Примечание:

  1. У меня есть много столбцов, многие из которых имеют этот шаблон данных, но не в ту же дату/время. Если мне нужно, я буду делать преобразование по одному столбцу за раз.

РЕДАКТИРОВАТЬ:

После ответа @zero323 я попробовал так:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

широковещательная переменная заканчивается списком значений без нулей. Этот прогресс, но я все еще не могу заставить работать карту. но я ничего не получаю, потому что индекс i в не отображается на исходные данные, он отображается на подмножество без нуля.

Что мне здесь не хватает?

РЕДАКТИРОВАТЬ и решение (как видно из @zero323 answer):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

См. Ноль 323 ответа ниже для получения дополнительных опций, если вы используете RDD вместо DataFrames. Решение выше, возможно, не самое эффективное, но работает для меня. Если вы хотите оптимизировать, проверьте решение RDD.

4b9b3361

Ответ 1

Начальный ответ (одно предположение о временном ряду):

Прежде всего, попробуйте избежать функций окна, если вы не можете предоставить предложение PARTITION BY. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.

Что вы можете сделать, это заполнить пробелы на RDD с помощью mapPartitionsWithIndex. Поскольку вы не представили пример данных или ожидаемого вывода, считайте это псевдокодом не реальной программой Scala:

  • сначала позволяет упорядочить DataFrame по дате и преобразовать в RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • next позволяет найти последнее не нулевое наблюдение за раздел

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • и преобразуйте этот Map в широковещательную рассылку

    val toCarryBd = sc.broadcast(toCarry)
    
  • окончательно перечислить разделы, снова заполнив пробелы:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • наконец, вернитесь к DataFrame

Изменить (секционированные/временные ряды по групповым данным):

Дьявол в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy. Предположим, вы просто разделите столбец "v" типа T и Date - это целая временная метка:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

Таким образом, вы можете одновременно заполнить все столбцы.

Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?

Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.

Более простой способ добиться аналогичного результата в последней версии Spark - использовать last с ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

В то время как можно отказаться от предложения partitionBy и применить этот метод глобально, это было бы слишком дорогостоящим с большими наборами данных.