Подтвердить что ты не робот

Чтение файлов, отправленных с помощью spark-submit драйвером

Я отправляю задание Spark для запуска в удаленном кластере, запустив

spark-submit ... --deploy-mode cluster --files some.properties ...

Я хочу прочитать содержимое файла some.properties с помощью кода драйвера, т.е. до создания контекста Spark и запуска задач RDD. Файл копируется в удаленный драйвер, но не в рабочий каталог драйвера.

Способы этой проблемы, о которых я знаю, следующие:

  • Загрузите файл в HDFS
  • Сохраните файл в банке приложения

Оба неудобны, поскольку этот файл часто изменяется на отправляющей машине dev.

Есть ли способ прочитать файл, который был загружен с использованием флага --files во время основного метода кода драйвера?

4b9b3361

Ответ 1

Да, вы можете получить доступ к файлам, загруженным с помощью аргумента --files.

Вот как я могу получить доступ к файлам, переданным через --files:

./bin/spark-submit \
--class com.MyClass \
--master yarn-cluster \
--files /path/to/some/file.ext \
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar \
/path/to/app.jar file.ext

и в моем искровом коде:

val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size

Я действительно верю, что эти файлы загружаются на рабочих в том же каталоге, что и jar, поэтому просто передается имя файла, а не абсолютный путь к Source.fromFile.

Ответ 2

Параметры -files и --archives поддерживают указание имен файлов С#, аналогичным Hadoop. Например, вы можете указать: -files localtest.txt # appSees.txt, и это выведет файл, который вы локально назвали localtest.txt в HDFS, но это будет связано с именем appSees.txt, и ваше приложение должно использовать имя как appSees.txt, чтобы ссылаться на него при запуске на YARN.

это работает для моего искрового потокового приложения как в режиме пряжи/клиента, так и в режиме пряжи/кластера. может помочь вам

Ответ 3

После расследования я нашел одно решение по вышеуказанному вопросу. Отправьте конфигурацию any.properties во время spark-submit и используйте ее драйвером spark до и после инициализации SparkSession. Надеюсь, это поможет вам.

any.properties

spark.key=value
spark.app.name=MyApp

SparkTest.java

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class SparkTest{

  public Static void main(String[] args){

    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();

    Config conf = loadConf();
    System.out.println(conf.getString("spark.key"));

    // Initialize SparkContext and use configuration from properties
    SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));

    SparkSession sparkSession = 
    SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
                .enableHiveSupport().getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

  }


  public static Config loadConf() {

      String configFileName = "any.properties";
      System.out.println(configFileName);
      Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
      System.out.println(configs.getString("spark.key")); // get value from properties file
      return configs;
   }
}

Искра Отправить:

spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........

Ответ 4

Вот хорошее решение, которое я разработал в Python Spark, чтобы интегрировать любые данные в виде файла извне на платформу Big Data.

Удачи.

# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly)
# (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html)
def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar):
    localTextFilePath = localTextFilePath.strip(' ')
    if (localTextFilePath.startswith("file://")):
        localTextFilePath = localTextFilePath[7:]
    import subprocess
    dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True)
    textRDD = sparkContext.parallelize(dataBytes.split(splitChar))
    return textRDD

# Usage example
myRDD = parallelizeTextFileToRDD(sc, '~/myTextFile.txt', '\n') # Load my local file as a RDD
myRDD.saveAsTextFile('/user/foo/myTextFile') # Store my data to HDFS

Ответ 5

Поворот проблемы заключается в том, что вы можете создать временный SparkContext просто, вызвав SparkContext.getOrCreate(), а затем прочитайте файл, который вы передали в --files, с помощью SparkFiles.get('FILE').

Как только вы прочитаете файл, вы получите всю необходимую конфигурацию, необходимую в переменной SparkConf().

После этого вызовите эту функцию:

SparkContext.stop(SparkContext.getOrCreate())

Это приведет к отключению существующего SparkContext, а в следующей строке просто инициализирует новый SparkContext с необходимыми конфигурациями, подобными этому.

sc = SparkContext(conf=conf).getOrCreate()

Вы получили SparkContext с желаемыми настройками

Ответ 6

используйте spark-submit --help, обнаружите, что эта опция только для рабочего каталога исполнителя, а не драйвера.

--files FILES: Comma-separated list of files to be placed in the working directory of each executor.