Подтвердить что ты не робот

Как проверить контрольные точки DataFrames?

Я ищу способ проверки контрольных точек DataFrames. В настоящее время контрольная точка работает с RDD, но я не могу найти, как это сделать с помощью DataFrames. persist и cache (которые являются синонимами друг для друга) доступны для DataFrame, но они не "ломают линию" и, таким образом, непригодны для методов, которые могут зацикливаться на сотни (или тысячи) итераций.

В качестве примера предположим, что у меня есть список функций, сигнатура которых - DataFrame = > DataFrame. Я хочу иметь способ вычислить следующее, даже если myfunctions содержит сотни или тысячи записей:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }
4b9b3361

Ответ 1

TL; DR:. Для версий Spark до 1.6, чтобы получить "контрольную точку DF", мое предложенное решение основано на другом ответе, но с одной дополнительной строкой:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed

Объяснение

Обновлено после дальнейших исследований.

Как указано, контрольная точка DataFrame напрямую отсутствует (Spark 1.6.1), хотя есть issue для это на Spark Jira.

Таким образом, возможное обходное решение - это предложение, предложенное в другом ответе:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

Однако при таком подходе будет проверяться только объект df.rdd. Это можно проверить, позвонив toDebugString в df.rdd:

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

а затем вызывая toDebugString после быстрого преобразования в df (обратите внимание, что я создал свой DataFrame из источника JDBC), возвращает следующее:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explain также показывает подсказку:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

Итак, чтобы фактически получить "контрольно-контрольный" DataFrame, я могу думать только о создании нового из контрольного RDD:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

Затем мы можем проверить, что новый DataFrame "проверен":

1) newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3) С преобразованием:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

Кроме того, я попробовал несколько более сложных преобразований, и на практике я смог проверить, что объект newDF был проверен.

Таким образом, единственный способ, которым я нашел надежную контрольную точку DataFrame, был путем проверки связанного с ним RDD и создания из него нового объекта DataFrame.

Надеюсь, это поможет. Приветствия.

Ответ 3

Я думаю, что сейчас вам придется делать

sc.setCheckpointDir("/DIR")
df.rdd.checkpoint

И тогда вам нужно будет выполнить свое действие в базовом df.rdd. Вызов df.ACTION не будет работать в настоящее время, только df.rdd.ACTION

Ответ 4

Расширение ответа Ассафа Мендельсона,

На сегодняшний день Spark версии 2.2, DataSet#checkpoint() API Развитие и эксперимент

Использование:

Перед контрольной точкой CheckpointDir следует указывать с помощью SparkContext

spark.sparkContext.setCheckpointDir("checkpoint/dir/location")

val ds: Dataset[Long] = spark.range(10).repartition('id % 2)

// do checkpoint now, it will preserve partition also
val cp: Dataset[Long] = ds.checkpoint()

Как работает внутренне?

Пока реализация для контрольной точки DataSet заключается в том, чтобы преобразовать DataSet в RDD, а затем выполнить контрольную точку.

// In DataSet.scala 

//API we used in example  
def checkpoint(): Dataset[T] = checkpoint(eager = true)

//Base implementation
def checkpoint(eager: Boolean): Dataset[T] = {
    val internalRdd = queryExecution.toRdd.map(_.copy())
    internalRdd.checkpoint()

    if (eager) {
      internalRdd.count() //To materialize DataSet immediately on checkpoint() call
    }

  ...
}