Подтвердить что ты не робот

Как читать входные данные S3 в кластерном приложении Spark Streaming EC2

Я пытаюсь сделать приложение Spark Streaming своим вводом из каталога S3, но я получаю это исключение после запуска его с помощью spark-submit script:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Я устанавливаю эти переменные через этот блок кода, как предлагается здесь http://spark.apache.org/docs/latest/ec2-scripts.html (внизу страницы):

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args (2) и args (3) - это мой идентификатор ключа доступа AWS и секретный ключ доступа, конечно.

Почему он продолжает говорить, что они не установлены?

EDIT: Я тоже так пытался, но получаю то же исключение:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
4b9b3361

Ответ 1

Нечетный. Попробуйте также сделать .set на sparkContext. Попробуйте также экспортировать переменные env перед запуском приложения:

export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>

^^ Вот как мы это делаем.

ОБНОВЛЕНИЕ: Согласно @tribbloid выше вышло в 1.3.0, теперь вам нужно долгое время сталкиваться с hdfs-site.xml, или ваш может (и это работает в искровой оболочке):

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)

Ответ 2

Следующая конфигурация работает для меня, убедитесь, что вы также установили "fs.s3.impl":

val conf = new SparkConf().setAppName("Simple Application").setMaster("local")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

Ответ 3

В AWS EMR приведенные выше предложения не сработали. Вместо этого я обновил следующие свойства в файле conf/core-site.xml:

fs.s3n.awsAccessKeyId и fs.s3n.awsSecretAccessKey с вашими учетными данными S3.

Ответ 4

Для тех, кто использует EMR, используйте конструкцию Spark, как описано в https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark, и просто ссылку S3 с URI s3://. Не нужно устанавливать реализацию S3 или дополнительную конфигурацию, поскольку учетные данные задаются IAM или ролью.

Ответ 5

Я хотел бы сделать учетные данные более безопасными в файле конфигурации на одном из моих зашифрованных разделов. Поэтому я сделал export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf перед запуском моего искрового приложения и поместил файл в этот каталог (зашифрованный через ecryptfs) под названием core-site.xml, содержащий учетные данные, подобные этому:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>my_aws_access_key_id_here</value>
  </property>
  <property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>my_aws_secret_access_key_here</value>
  </property>
</configuration>

HADOOP_CONF_DIR также можно установить в conf/spark-env.sh.

Ответ 6

Последние выпуски EMR (протестированные на 4.6.0) требуют следующей конфигурации:

val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)

Хотя в большинстве случаев из конфигурации окна должно работать - это, если у вас есть разные учетные данные S3 из тех, с которыми вы запускали кластер.

Ответ 7

это работает для меня в оболочке 1.4.1:

val conf = sc.getConf
conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>)
conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>)
SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf))
...
sqlContext.read.parquet("s3://...")

Ответ 8

Augmenting @nealmcb ответ, самый простой способ сделать это - определить

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 

в conf/spark-env.sh или экспортируйте эту переменную env в ~/.bashrc или ~/.bash_profile.

Это будет работать до тех пор, пока вы сможете получить доступ к s3 через hadoop. Например, если вы можете запустить

hadoop fs -ls s3n://path/

то hasoop может видеть путь s3.

Если hadoop не видит путь, следуйте советам, содержащимся в Как я могу получить доступ к S3/S3n из локальной установки Hadoop 2.6?