Подтвердить что ты не робот

Получение Spark, Python и MongoDB для совместной работы

У меня возникают трудности с тем, чтобы эти компоненты правильно вязали. У меня есть Spark, установленный и работающий успешно, я могу запускать задания локально, автономно, а также через YARN. Я следил за инструкциями (насколько мне известно) здесь и здесь

Я работаю над Ubuntu, и у меня есть различные версии компонентов

У меня возникли трудности с выполнением различных шагов, например, какие банки добавить к какому пути, поэтому я добавил

  • in /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce Я добавил mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • следующие переменные среды
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • export PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • export PATH=$PATH:$SPARK_HOME/bin

Моя программа Python является базовой

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:[email protected]:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

Я запускаю его, используя команду

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

и в результате получается следующий результат

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:[email protected]:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

В соответствии с здесь

Это исключение возникает, когда в клиенте Java возникает исключение код. Например, если вы попытаетесь вытащить элемент из пустого стека. Экземпляр генерируемого исключения Java хранится в java_exception.

Рассматривая исходный код для pymongo_spark.py и строку, бросающую ошибку, он говорит

"Ошибка при общении с JVM. Является ли банкомат MongoDB Spark Spark CLASSPATH?:"

Итак, в ответ я пытался убедиться, что правильные банки сданы, но я мог бы сделать все это неправильно, см. ниже

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

Я импортировал pymongo в ту же программу python, чтобы проверить, что я могу хотя бы обратиться к MongoDB, используя это, и могу.

Я знаю, что здесь есть немало движущихся частей, поэтому, если я могу предоставить более полезную информацию, пожалуйста, дайте мне знать.

4b9b3361

Ответ 1

Обновления:

2016-07-04

С момента последнего обновления MongoDB Spark Connector созрел довольно много. Он предоставляет обновленные бинарные файлы и API, основанный на источниках данных, но использует конфигурацию SparkConf поэтому он субъективно менее гибкий, чем Stratio/Spark-MongoDB.

2016-03-30

После первоначального ответа я нашел два разных способа подключения к MongoDB от Spark:

В то время как первый, по-видимому, относительно незрелый, последний выглядит намного лучше, чем разъем Mongo-Hadoop, и предоставляет Spark SQL API.

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

Он кажется намного более стабильным, чем mongo-hadoop-spark, поддерживает предикат pushdown без статической конфигурации и просто работает.

Оригинальный ответ:

Действительно, здесь есть немало движущихся частей. Я попытался сделать это немного более управляемым, построив простое изображение Docker, которое примерно соответствует описанной конфигурации (хотя для краткости я опутал библиотеки Hadoop). Вы можете найти полный источник на GitHub (DOI 10.5281/zenodo.47882) и построить его с нуля:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

или загрузите изображение, которое я нажал на Docker Hub, чтобы вы могли просто docker pull zero323/mongo-spark):

Начать образы:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

Запуск оболочки оболочки PySpark --jars и --driver-class-path:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

И, наконец, посмотрим, как это работает:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

Обратите внимание, что mongo-hadoop, похоже, закрывает соединение после первого действия. Таким образом, вызывая, например, rdd.count() после того, как сбор выдаст исключение.

Основываясь на разных проблемах, с которыми я столкнулся, создавая этот образ, я склонен полагать, что прохождение mongo-hadoop-1.5.0-SNAPSHOT.jar и mongo-hadoop-spark-1.5.0-SNAPSHOT.jar как --jars и --driver-class-path - единственное жесткое требование.

Примечания:

  • Это изображение свободно основано на jaceklaskowski/docker-spark, поэтому, пожалуйста, не забудьте отправить какую-то хорошую карму @jacek-laskowski, если это поможет.
  • Если не требуется версия разработки, включая новый API, то использование --packages, скорее всего, является лучшим вариантом.

Ответ 2

Можете ли вы попробовать использовать --package вместо --jars ... в команде spark-submit:

spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]

Некоторые из этих файлов jar не являются банками Uber и требуют загрузки большего количества зависимостей, прежде чем они смогут работать.

Ответ 3

У меня была такая же проблема вчера. Удалось зафиксировать его, разместив mongo-java-driver.jar в $HADOOP_HOME/lib и mongo-hadoop-core.jar и mongo-hadoop-spark.jar в $HADOOP_HOME/spark/classpath/emr (или любую другую папку, которая находится в $SPARK_CLASSPATH).

Сообщите мне, если это поможет.

Ответ 4

Удачи!

@see https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

from pyspark import SparkContext, SparkConf

import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()


def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # This RDD *does not* contain key/value pairs, just documents.
    # If you want key/value pairs, use the mongoPairRDD method instead.
    rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')

    # Save this RDD back to MongoDB as a different collection.
    rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')

    # You can also read and write BSON:
    bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
    bson_rdd.saveToBSON('/path/to/bson/output')

if __name__ == '__main__':
    main()