Подтвердить что ты не робот

Spark читать файл из S3 с помощью sc.textFile("s3n://...)

Попытка прочитать файл, расположенный в S3, с использованием искровой оболочки:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    ... etc ...

Исключение IOException: Нет схемы FileSystem для схемы: s3n произошла с:

  • Искры 1.31 или 1.40 на dev машине (нет Hadoop libs)
  • Выполняется с Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60), который интегрирует Spark 1.2.1 из коробки
  • Использование схемы s3://или s3n://

В чем причина этой ошибки? Отсутствует зависимость, Отсутствует конфигурация или неправильное использование sc.textFile()?

Или может быть, это связано с ошибкой, которая влияет на конструкцию Spark, специфичную для Hadoop 2.60, поскольку это сообщение , как представляется. Я собираюсь попробовать Spark для Hadoop 2.40, чтобы убедиться, что это решает проблему.

4b9b3361

Ответ 1

Подтверждено, что это связано с конструкцией Spark против Hadoop 2.60. Просто установлен Spark 1.4.0 "Предназначен для Hadoop 2.4 и более поздних версий" (вместо Hadoop 2.6). И теперь код работает нормально.

sc.textFile("s3n://bucketname/Filename") теперь вызывает еще одну ошибку:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

В приведенном ниже коде используется формат URL S3, чтобы показать, что Spark может читать файл S3. Использование dev-машины (без Hadoop libs).

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:[email protected]/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> lyrics.count
res1: Long = 9

Даже лучше: приведенный выше код с учетными данными AWS, встроенными в URI S3N, будет нарушен, если у секретного ключа AWS есть "/". Настройка учетных данных AWS в SparkContext будет исправлена. Код работает независимо от того, является ли файл S3 общедоступным или закрытым.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count

Ответ 2

Несмотря на то, что этот вопрос уже принят, я думаю, что точные детали того, почему это происходит, по-прежнему отсутствуют. Поэтому я думаю, что может быть место для еще одного ответа.

Если вы добавите требуемую зависимость hadoop-aws, ваш код должен работать.

Запуск Hadoop 2.6.0, соединитель s3 FS был перемещен в отдельную библиотеку с именем hadoop-aws. Для этого есть и Джира: Переместить связанный с s3 код соединителя FS в hadoop-aws.

Это означает, что любая версия искры, которая была построена против Hadoop 2.6.0 или новее, должна будет использовать другую внешнюю зависимость, чтобы иметь возможность подключиться к файловой системе S3.
Вот пример sbt, который я пробовал и работает как ожидалось, используя Apache Spark 1.6.2, построенную против Hadoop 2.6.0:

libraryDependencies + = "org.apache.hadoop" % "hadoop-aws" % "2.6.0"

В моем случае я столкнулся с некоторыми проблемами с зависимостями, поэтому я решил добавить исключение:

libraryDependencies + = "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude ( "tomcat", "jasper-compiler" ) excludeAll ExclusionRule (organization = "javax.servlet" )

В другой связанной заметке я еще не попробовал, но рекомендуется использовать файловую систему "s3a", а не "s3n", начиная с Hadoop 2.6.0.

Третье поколение, s3a: файловая система. Предназначен для замены s3n:, эта привязка файловой системы поддерживает более крупные файлы и promises более высокую производительность.

Ответ 3

Вы можете добавить параметр --пакеты с помощью соответствующей банки: к вашему представлению:

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py

Ответ 4

Это примерный искровой код, который может читать файлы, присутствующие на s3

val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)

Ответ 5

Ran в ту же проблему в Spark 2.0.2. Решил его, подав ему банки. Вот что я побежал:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar

scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")

очевидно, вам нужно иметь банки на пути, где вы используете искровую оболочку из

Ответ 6

Для Spark 1.4.x "Предварительно построено для Hadoop 2.6 и более поздних версий":

Я просто скопировал необходимые S3, S3native пакеты из hadoop-aws-2.6.0.jar, чтобы искровой сборочно-1.4.1-hadoop2.6.0.jar.

После этого я перезапустил искровой кластер, и он работает. Не забудьте проверить владельца и режим сборщика.

Ответ 7

S3N не является файлом по умолчанию. Вам нужно создать версию Spark с версией Hadoop, в которой есть дополнительные библиотеки, используемые для совместимости с AWS. Дополнительная информация, которую я нашел здесь, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

Ответ 8

Вероятно, вам нужно использовать схему s3a:/вместо s3:/или s3n:/ Однако он не работает из коробки (для меня) для искровой оболочки. Я вижу следующую стек:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
        at $iwC$$iwC$$iwC.<init>(<console>:37)
        at $iwC$$iwC.<init>(<console>:39)
        at $iwC.<init>(<console>:41)
        at <init>(<console>:43)
        at .<init>(<console>:47)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
        ... 68 more

Что я думаю - вам нужно вручную добавить зависимость hadoop-aws вручную http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar Но я не знаю, как добавить его в искровой оболочки.

Ответ 9

Существует Spark JIRA, SPARK-7481, открытый на сегодняшний день, октябрь 20, 2016, чтобы добавить модуль искрового облака, который включает транзитивные зависимости от всех s3a и azure wasb: потребность, а также тесты.

И Spark PR, чтобы соответствовать. Вот как я получаю поддержку s3a в своих искровых построениях

Если вы делаете это вручную, вы должны получить JOA-хауп-аш-версии точной версии, которую остальная часть вашего JAR-хауса имеет, а версия AWS JARs на 100% синхронизирована с тем, что Hadoop aws было скомпилировано. Для Hadoop 2.7. {1, 2, 3,...}

hadoop-aws-2.7.x.jar 
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ jackson-*-2.6.5.jar

Вставьте все это в SPARK_HOME/jars. Запустите искру с вашими учетными данными, установленными в Env vars или в spark-default.conf

Простейший тест - вы можете сделать количество строк файла CSV

val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()

Получить номер: все хорошо. Получить трассировку стека. Плохая новость.

Ответ 10

USe s3a вместо s3n. У меня была аналогичная проблема на работе Hadoop. После переключения с s3n на s3a он работал.

например.

S3A://myBucket/myFile1.log

Ответ 11

Я столкнулся с той же проблемой. Он отлично работал после установки значения для fs.s3n.impl и добавления зависимости hasoop-aws.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")