Подтвердить что ты не робот

SQL по Spark Streaming

Это код для запуска простых SQL-запросов по Spark Streaming.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

Как вы можете видеть, для запуска SQL поверх потоковой передачи запрос должен выполняться внутри метода foreachRDD. Я хочу запустить SQL-соединение над данными, полученными из двух разных потоков. Есть ли способ сделать это?

4b9b3361

Ответ 1

Хорошо, я хотел бы подытожить обходной путь, по которому мы пришли после обсуждения в ответ Spiro. Его предложение сначала создать пустой стол, а затем вставить в него RDD. Единственная проблема заключается в том, что Spark не позволяет вставлять в таблицы еще. Вот что можно сделать:

Сначала создайте RDD с той же схемой, что и ожидаемая из вашего потока:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

Затем сохраните его как файл Паркет

d1.saveAsParquetFile("/home/p1.parquet")

Теперь загрузите файл паркета и зарегистрируйте его как таблицу с помощью метода registerAsTable().

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

Теперь, когда вы получаете свой поток, просто примените foreachRDD() в своем потоке и продолжайте вставлять отдельные RDD в таблицу, созданную выше, используя insertInto() метод

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

Эта функция insertInto() отлично работает и позволяет собирать данные в таблицу. Теперь вы можете сделать то же самое для любого количества потоков, а затем запустить свои запросы.

Ответ 2

Как вы написали свой код, в итоге вы создаете последовательность небольших SchemaRDD при каждом запуске SQL-запроса. Хитрость заключается в том, чтобы сохранить каждый из них либо в накопительном RDD, либо в таблице накопления.

Сначала, подход таблицы, используя insertInto:

Для каждого из ваших потоков сначала создайте резервную копию emty, которую вы регистрируете в виде таблицы, и получите пустую таблицу. Для вашего примера позвольте сказать, что вы называете это "allTeenagers".

Затем для каждого из ваших запросов используйте метод SchemaRDD insertInto, чтобы добавить результат в эту таблицу:

teenagers.insertInto("allTeenagers")

Если вы сделаете это с обоими потоками, создав две отдельные таблицы накопления, вы можете присоединиться к ним, используя простой старый SQL-запрос.

(Примечание: на самом деле я не смог заставить его работать, и небольшой поиск заставляет меня сомневаться в том, что у кого-то еще есть, но я уверен, что понял смысл дизайна insertInto, поэтому Я думаю, что это решение стоит записать.)

Второй, подход unionAll (также есть метод union, но это делает сложнее получить правильные типы):

Это связано с созданием исходного RDD - снова позвоните ему allTeenagers.

// create initial SchemaRDD even if it empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")

Затем каждый раз:

val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)

Возможно, нет необходимости говорить, что вам нужны столбцы, чтобы соответствовать.