Подтвердить что ты не робот

Hadoop DistributedCache устарел - какой предпочтительный API?

Для задач моей карты требуются некоторые данные конфигурации, которые я хотел бы распространять через распределенный кэш.

Hadoop MapReduce Tutorial показывает использование класса DistributedCache примерно следующим образом:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

Однако DistributedCache отмечен как устаревший в Hadoop 2.2.0.

Какой новый предпочтительный способ достичь этого? Есть ли современный пример или учебник, охватывающий этот API?

4b9b3361

Ответ 1

API-интерфейсы распределенного кэша можно найти в самом классе Job. Проверьте документацию здесь: http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html  Код должен быть чем-то вроде

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

В коде вашего картографа:

Path[] localPaths = context.getLocalCacheFiles();
...

Ответ 2

Чтобы расширить на @jtravaglini, предпочтительный способ использования DistributedCache для YARN/MapReduce 2 выглядит следующим образом:

В вашем драйвере используйте Job.addCacheFile()

public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "MyJob");

    job.setMapperClass(MyMapper.class);

    // ...

    // Mind the # sign after the absolute file location.
    // You will be using the name after the # sign as your
    // file name in your Mapper/Reducer
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

    return job.waitForCompletion(true) ? 0 : 1;
}

И в вашем Mapper/Reducer переопределите метод setup(Context context):

@Override
protected void setup(
        Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
    if (context.getCacheFiles() != null
            && context.getCacheFiles().length > 0) {

        File some_file = new File("./some");
        File other_file = new File("./other");

        // Do things to these two files, like read them
        // or parse as JSON or whatever.
    }
    super.setup(context);
}

Ответ 4

Я не использовал job.addCacheFile(). Вместо этого я использовал опцию -files как "-files/path/to/myfile.txt#myfile", как и раньше. Затем в коде транслятора или редуктора я использую следующий метод:

/**
 * This method can be used with local execution or HDFS execution. 
 * 
 * @param context
 * @param symLink
 * @param throwExceptionIfNotFound
 * @return
 * @throws IOException
 */
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
    URI[] uris = context.getCacheFiles();
    if(uris==null||uris.length==0)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    URI symlinkUri = null;
    for(URI uri: uris)
    {
        if(symLink.equals(uri.getFragment()))
        {
            symlinkUri = uri;
            break;
        }
    }   
    if(symlinkUri==null)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);

}

Тогда в mapper/редукторе:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
    ... do work ...
}

Обратите внимание, что если я использовал "-files/path/to/myfile.txt" напрямую, мне нужно использовать "myfile.txt" для доступа к файлу, так как это символическое имя по умолчанию.

Ответ 5

Ни одно из упомянутых решений не работало для меня в полноте. Это может быть потому, что версия Hadoop продолжает меняться. Я использую hasoop 2.6.4. По сути, DistributedCache устарел, поэтому я не хотел его использовать. Поскольку некоторые из сообщений предлагают нам использовать addCacheFile(), однако, он немного изменился. Вот как это сработало для меня

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

Здесь X.X.X.X может быть основным IP-адресом или локальным хостом. Файл EnglishStop.txt хранился в HDFS в /location.

hadoop fs -ls /

Выходной сигнал

-rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test

Забавный, но удобный, # EnglishStop.txt означает, что теперь мы можем получить к нему доступ как "EnglishStop.txt" в mapper. Вот код для того же

public void setup(Context context) throws IOException, InterruptedException     
{
    File stopwordFile = new File("EnglishStop.txt");
    FileInputStream fis = new FileInputStream(stopwordFile);
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

    while ((stopWord = reader.readLine()) != null) {
        // stopWord is a word read from Cache
    }
}

Это просто сработало для меня. Вы можете читать строку из файла, хранящегося в HDFS

Ответ 6

У меня была та же проблема. И не только DistributedCach устарел, но getLocalCacheFiles и "new Job" тоже. Так что для меня работало следующее:

Driver:

Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());

В настройке Mapper/Reducer:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null

    Path file1path = new Path(files[0])
    ...
}