Подтвердить что ты не робот

Spark Context Textfile: загрузка нескольких файлов

Мне нужно обработать несколько файлов, разбросанных по различным каталогам. Я хотел бы загрузить все это в один RDD, а затем выполнить карту/уменьшить на нем. Я вижу, что SparkContext способен загружать несколько файлов из одного каталога с помощью подстановочных знаков. Я не уверен, как загружать файлы из нескольких папок.

Следующий фрагмент кода завершается с ошибкой:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

Это не работает в третьем цикле со следующим сообщением об ошибке:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

Что причудливо, я предоставляю только 2 аргумента. Любые указатели оценили.

4b9b3361

Ответ 1

Как насчет этой фразы?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

В Scala SparkContext.union() есть два варианта: один, который принимает аргументы vararg, и тот, который принимает список. Только второй существует в Python (поскольку Python не имеет полиморфизма).

ОБНОВЛЕНИЕ

Вы можете использовать один вызов textFile для чтения нескольких файлов.

sc.textFile(','.join(files))

Ответ 2

Я решаю подобные проблемы, используя подстановочный знак.

например. Я нашел некоторые черты в файлах, которые я хочу загрузить в искры,

реж

subdir1/folder1/x.txt

     

subdir2/folder2/y.txt

вы можете использовать следующее предложение

sc.textFile("dir/*/*/*.txt")

для загрузки всех относительных файлов.

Подстановочный знак '*' работает только в одном уровне каталога, который не является рекурсивным.

Ответ 3

Вы можете использовать следующую функцию SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Прочитайте каталог текстовых файлов из HDFS, локальной файловой системы (доступной на всех узлах) или любого поддерживаемого Hadoop URI файловой системы. Каждый файл считывается как одна запись и возвращается в пару ключ-значение, где ключ - это путь к каждому файлу, значение - это содержимое каждого файла.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

Ответ 4

Вы можете использовать этот

Сначала вы можете получить буфер/список путей S3:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Теперь передайте этот объект List следующему фрагменту кода, обратите внимание: sc является объектом SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Теперь вы получили окончательный унифицированный RDD, т.е. df

Необязательно, и вы также можете переделать его в одном BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Перераспределение всегда работает: D