Подтвердить что ты не робот

Как использовать источник JDBC для записи и чтения данных в (Py) Spark?

Цель этого вопроса - документ:

  • шаги, необходимые для чтения и записи данных с использованием соединений JDBC в PySpark

  • возможные проблемы с источниками JDBC и известными решениями

При небольших изменениях эти методы должны работать с другими поддерживаемыми языками, включая Scala и R.

4b9b3361

Ответ 1

Запись данных

  1. Включите соответствующий драйвер JDBC при отправке приложения или запуске оболочки. Вы можете использовать, например, --packages:

    bin/pyspark --packages group:name:version  
    

    или комбинируя driver-class-path и jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    Эти свойства также можно установить с PYSPARK_SUBMIT_ARGS переменной среды PYSPARK_SUBMIT_ARGS до PYSPARK_SUBMIT_ARGS экземпляра JVM или с помощью conf/spark-defaults.conf для установки spark.jars.packages или spark.jars/spark.driver.extraClassPath.

  2. Выберите нужный режим. Spark JDBC Writer поддерживает следующие режимы:

    • append: добавить содержимое этого: class: DataFrame к существующим данным.
    • overwrite: перезаписать существующие данные.
    • ignore: игнорировать эту операцию, если данные уже существуют.
    • error (случай по умолчанию): выдает исключение, если данные уже существуют.

    Upserts или другие мелкозернистые модификации не поддерживаются

    mode = ...
    
  3. Подготовьте URI JDBC, например:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (Необязательно) Создайте словарь аргументов JDBC.

    properties = {
        "user": "foo",
        "password": "bar"
    }
    

    properties/options могут быть использованы для установки поддерживаемых свойств соединения JDBC.

  5. Используйте DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    сохранить данные (подробности см. в pyspark.sql.DataFrameWriter).

Известные проблемы:

  • Подходящий драйвер не может быть найден, если драйвер был включен с помощью --packages (java.sql.SQLException: No suitable driver found for jdbc:...)

    Если для решения этой проблемы нет версии драйвера, вы можете добавить класс driver в properties. Например:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • использование df.write.format("jdbc").options(...).save() может привести к:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource не позволяет создавать таблицы как выбранные.

    Решение неизвестно.

  • в Pyspark 1.3 вы можете попробовать вызвать метод Java напрямую:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

Чтение данных

  1. Выполните шаги 1-4 из записи данных
  2. Используйте sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    или sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

Известные проблемы и ошибки:

Где найти подходящие драйверы:

Другие опции

В зависимости от базы данных может существовать специализированный источник, который в некоторых случаях предпочтителен:

Ответ 2

Загрузите драйвер mysql-connector-java и сохраните в папке с искровым jar, обратите внимание на приведенный ниже код python, записывая данные в "acotr1", мы должны создать структуру таблицы acotr1 в базе данных mysql

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"

    df.write.jdbc(mysql_url,table="actor1",mode="append")

Ответ 3

Отправьте эту ссылку, чтобы загрузить jdbc для postgres, и следуйте инструкциям по загрузке jar файла.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar файл будет загружен на пути, подобном этому. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

Если ваша искробеговая версия 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "[email protected]") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

сохраните файл как python и запустите "python соответствующимfilename.py"