Подтвердить что ты не робот

Как перезаписать/повторно использовать существующий путь вывода для рабочих заданий Hadoop и agian

Я хочу перезаписать/повторно использовать существующий выходной каталог, когда я запускаю свою работу Hadoop ежедневно. Фактически в выходном каталоге будет храниться итоговый результат каждого результата задания рабочего дня. Если я укажу один и тот же выходной каталог, он выдаст ошибку "выходной каталог уже существует".

Как обойти эту проверку?

4b9b3361

Ответ 1

Как удалить каталог перед запуском задания?

Вы можете сделать это через оболочку:

hadoop fs -rmr /path/to/your/output/

или через API Java:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);

Ответ 2

Ответ Jungblut - ваше прямое решение. Поскольку я никогда не доверяю автоматическим процессам для удаления файлов (лично), я предлагаю альтернативу:

Вместо того, чтобы пытаться перезаписать, я предлагаю вам сделать динамическое имя вывода вашей работы, включая время его выполнения.

Что-то вроде "/path/to/your/output-2011-10-09-23-04/". Таким образом, вы можете сохранить свою старую работу на случай, если вам когда-нибудь понадобится вернуться. В моей системе, которая запускает 10 + ежедневные задания, мы структурируем вывод: /output/job1/2011/10/09/job1out/part-r-xxxxx, /output/job1/2011/10/10/job1out/part-r-xxxxx и т.д.

Ответ 3

Hadoop TextInputFormat (который, как я полагаю, вы используете) не позволяет перезаписать существующий каталог. Вероятно, чтобы извинить вас от боли, узнав, что вы по ошибке удалили то, что вы (и ваш кластер) очень усердно работали.

Однако, если вы уверены, что хотите, чтобы ваша папка вывода была перезаписана заданием, я считаю, что самый чистый способ - это немного изменить TextOutputFormat:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

Теперь вы создаете FSDataOutputStream (fs.create(file, true)) с помощью overwrite = true.

Ответ 4

Hadoop уже поддерживает эффект, который, как вам кажется, пытается достичь, позволяя нескольким путям ввода работать. Вместо того, чтобы пытаться иметь один каталог файлов, к которым вы добавляете больше файлов, укажите каталог каталогов, к которым вы добавляете новые каталоги. Чтобы использовать итоговый результат в качестве ввода, просто укажите входной глобус как подстановочный знак в подкаталогах (например, my-aggregate-output/*). Чтобы "добавить" новые данные в агрегат в качестве вывода, просто укажите новый уникальный подкаталог агрегата в качестве выходного каталога, как правило, используя временную метку или некоторый порядковый номер, полученный из ваших входных данных (например, my-aggregate-output/20140415154424).

Ответ 5

Вы можете создать подкаталог вывода для каждого выполнения по времени. Например, предположим, что вы ожидаете выходную директорию от пользователя, а затем установите ее следующим образом:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

Измените это на следующие строки:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));

Ответ 6

У меня был похожий вариант использования, я использую MultipleOutputs для решения этой проблемы.

Например, если я хочу, чтобы разные задания MapReduce записывали в один и тот же каталог /outputDir/. Задание 1 записывает в /outputDir/job1-part1.txt, задание 2 записывает в /outputDir/job1-part2.txt (без удаления существующих файлов).

В основном установите выходной каталог на случайный (его можно удалить до запуска нового задания)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

В редукторе/маппере используйте MultipleOutputs и настройте модуль записи на запись в желаемый каталог:

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

а также:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

Тем не менее, мой вариант использования был немного сложнее, чем это. Если это просто запись в один и тот же плоский каталог, вы можете записать в другой каталог и запустить скрипт для переноса файлов, например: hadoop fs -mv/tmp/*/outputDir

В моем случае каждое задание MapReduce выполняет запись в разные подкаталоги в зависимости от значения записываемого сообщения. Структура каталогов может быть многослойной, например:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

Каждое задание Mapreduce может записывать в тысячи подкаталогов. И стоимость записи в каталог tmp и перемещения каждого файла в правильный каталог высока.

Ответ 7

Если кто-то загружает входной файл (например, с добавленными записями) из локальной файловой системы в распределенную файловую систему hadoop как таковую:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

Затем можно также перезаписать/повторно использовать существующий выходной каталог с помощью -f. Нет необходимости удалять или заново создавать папку

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase

Ответ 8

Hadoop придерживается философии "Пиши один раз, читай много раз" Таким образом, когда вы пытаетесь записать в каталог снова, он предполагает, что должен создать новый (Писать один раз), но он уже существует, и поэтому он жалуется. Вы можете удалить его через hadoop fs -rmr/path/to/your/output/. Лучше создать динамический каталог (например, на основе метки времени или значения хеша), чтобы сохранить данные.

Ответ 9

Я столкнулся с этой точной проблемой, это связано с исключением, возникшим в checkOutputSpecs в классе FileOutputFormat. В моем случае я хотел иметь много заданий на добавление файлов в уже существующие каталоги, и я гарантировал, что файлы будут иметь уникальные имена.

Я решил это, создав класс выходного формата, который переопределяет только метод checkOutputSpecs и душит (игнорирует) FileAlreadyExistsException которое FileAlreadyExistsException его при проверке, существует ли каталог.

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

И в конфигурации задания я использовал LazyOutputFormat а также MultipleOutputs.

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);