Подтвердить что ты не робот

Spark sql Dataframe - импорт sqlContext.implicits._

У меня есть основной, который создает контекст искры:

    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

Затем создает фреймворк данных, а также фильтры и проверки на кадре данных.

    val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
    // record length cannot be < 2 
    .na.drop(3)
    // round to hours
    .withColumn("time",convertToHourly($"time"))

Это отлично работает.

НО Когда я попытаюсь перенести мои проверки на другой файл, отправив dataframe в

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}

который получает Dataframe и выполняет проверки и преобразования: похоже, мне нужен

 import sqlContext.implicits._

Чтобы избежать ошибки: "значение $не является членом StringContext" что происходит в режиме онлайн:          .withColumn( "время", convertToHourly ( $ "время" ))

Но использовать import sqlContext.implicits._ Мне также нужен sqlContext, определенный в новом файле, например:

val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

или отправить его на

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
function

Мне кажется, что разделение, которое я пытаюсь сделать с 2-мя файлами (main и validation), выполняется неправильно...

Любая идея о том, как это сделать? Или просто отправить sqlContext в функцию?

Спасибо!

4b9b3361

Ответ 1

Вы можете работать с экземпляром singleton SQLContext. Вы можете посмотреть этот пример в искровом репозитории

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
...
//And wherever you want you can do
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._