Подтвердить что ты не робот

Как подключиться к базе данных postgreSQL в Apache Spark с помощью scala?

Я хочу знать, как я могу выполнить следующие действия в scala?

  • Подключитесь к базе данных postgreSQL, используя Spark scala.
  • Введите SQL-запросы, такие как SELECT, UPDATE и т.д., чтобы изменить таблицу в этой базы данных.

Я знаю, как это сделать, используя scala, но как импортировать jar для соединителя psql scala в sbt при его упаковке?

4b9b3361

Ответ 1

Наша цель - запустить параллельные SQL-запросы от рабочих Spark.

Настройка сборки

Добавьте соединитель и JDBC в libraryDependencies в build.sbt. Я только пробовал это с MySQL, поэтому я буду использовать это в своих примерах, но Postgres должен быть одинаков.

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

код

Когда вы создаете SparkContext, вы указываете, какие банки копировать исполнителям. Включите соединительную банку. Хороший способ сделать это:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

Теперь Spark готов к подключению к базе данных. Каждый исполнитель выполнит часть запроса, чтобы результаты были готовы к распределенным вычислениям.

Для этого есть два варианта. Более старый подход заключается в использовании org.apache.spark.rdd.JdbcRDD:

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

Проверьте документацию по параметрам. Вкратце:

  • У вас есть SparkContext.
  • Затем создается функция, которая создает соединение. Это будет вызвано для каждого рабочего для подключения к базе данных.
  • Затем запрос SQL. Это должно быть похоже на пример и содержать заполнители для стартового и конечного ключей.
  • Затем вы указываете диапазон ключей (от 0 до 1000 в моем примере) и количество разделов. Диапазон будет разделен между разделами. Таким образом, один пример исполнителя завершит выполнение SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 в этом примере.
  • И, наконец, у нас есть функция, которая преобразует ResultSet во что-то. В этом примере мы преобразуем его в String, поэтому вы получите RDD[String].

Так как Apache Spark версии 1.3.0 другой метод доступен через API DataFrame. Вместо JdbcRDD вы создадите org.apache.spark.sql.DataFrame:

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

См. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases для полного списка опций (диапазон клавиш и количество разделов можно установить так же, как с помощью JdbcRDD).

Обновление

JdbcRDD не поддерживает обновления. Но вы можете просто сделать их в foreachPartition.

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(Это создает одно соединение для каждого раздела. Если это проблема, используйте пул соединений!)

DataFrame поддерживает обновления с помощью методов createJDBCTable и insertIntoJDBC.