Объединение файлов hdfs - программирование
Подтвердить что ты не робот

Объединение файлов hdfs

У меня есть 1000+ файлов, доступных в HDFS с соглашением об именах от 1_fileName.txt до N_fileName.txt. Размер каждого файла - 1024 МБ. Мне нужно объединить эти файлы в один (HDFS) с сохранением порядка файла. Скажем 5_FileName.txt следует добавлять только после 4_fileName.txt

Каков наилучший и быстрый способ выполнить эту операцию.

Есть ли способ выполнить это слияние без копирования фактических данных между узлами данных? Для e-g: получить расположение блоков этих файлов и создать новую запись (имя_файла) в Namenode с этими расположениями блоков?

4b9b3361

Ответ 1

Нет эффективного способа сделать это, вам нужно переместить все данные на один node, а затем вернуться к HDFS.

Сценарий командной строки для этого может быть следующим:

hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt

Это приведет ко всем файлам, которые соответствуют glob для стандартного вывода, затем вы будете передавать этот поток команде put и вывести поток в файл HDFS с именем targetFilename.txt

Единственная проблема, с которой вы столкнулись, - это структура имени файла, для которой вы пошли - если у вас есть фиксированная ширина, нулевая часть номера будет проще, но в текущем состоянии вы получите неожиданный лексиграфический порядок (1, 10, 100, 1000, 11, 110 и т.д.), А не числовой порядок (1,2,3,4 и т.д.). Вы можете обойти это, изменив сценарий для:

hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \
    [0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt

Ответ 2

Существует метод API org.apache.hadoop.fs.FileUtil.copyMerge, который выполняет эту операцию:

public static boolean copyMerge(
                    FileSystem srcFS,
                    Path srcDir,
                    FileSystem dstFS,
                    Path dstFile,
                    boolean deleteSource,
                    Configuration conf,
                    String addString)

Он читает все файлы в srcDir в алфавитном порядке и добавляет их содержимое в dstFile.

Ответ 3

Если вы можете использовать искру. Это можно сделать, например,

sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)

Надеюсь, что это сработает, так как искра работает распределенным способом, вам не придется копировать файлы в один node. Хотя просто осторожность, объединение файлов в искру может быть медленным, если файлы очень большие.

Ответ 4

Поскольку порядок файлов важен, а лексикографический порядок не соответствует этой цели, он выглядит хорошим кандидатом для написания программы сопоставления для этой задачи, которая может периодически запускаться. Кстати говоря, нет редуктора, пишу это, поскольку задача карты HDFS эффективна, потому что она может объединить эти файлы в один выходной файл без большого перемещения данных по узлам данных. Поскольку исходные файлы находятся в HDFS, и поскольку задачи сопоставления будут проверять близость данных, он может объединять файлы без перемещения файлов через разные узлы данных.

Программе-картографу понадобится пользовательский InputSplit (беря имена файлов во входной директории и заказывая их по мере необходимости) и пользовательский InputFormat.

Преобразователь может использовать либо hdfs append, либо необработанный выходной поток, где он может писать в байте [].

Грубый эскиз программы Mapper, о которой я думаю, выглядит примерно так:

public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??> 
{
    FileSystem fs;

    public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException 
    {

        //Convert the destFile to Path.
        ...
        //make sure the parent directory of destFile is created first.
        FSDataOutputStream destOS = fs.append(destFilePath);
        //Convert the sourceFiles to Paths.
        List<Path> srcPaths;
        ....
        ....
            for(Path p: sourcePaths) {

                FSDataInputStream srcIS = fs.open(p);
                byte[] fileContent
                srcIS.read(fileContent);
                destOS.write(fileContent);
                srcIS.close();
                reporter.progress();  // Important, else mapper taks may timeout.
            }
            destOS.close();


        // Delete source files.

        for(Path p: sourcePaths) {
            fs.delete(p, false);
            reporter.progress();
        }

    }
}