Подтвердить что ты не робот

Глобальные переменные в hadoop

Моя программа следует за итеративным отображением/сокращением. И он должен остановиться, если будут выполнены определенные условия. В любом случае я могу установить глобальную переменную, которая может быть распределена по всем задачам map/reduce и проверить, достигает ли глобальная переменная условие завершения.

Что-то вроде этого.

While(Condition != true){

            Configuration conf = getConf();
            Job job = new Job(conf, "Dijkstra Graph Search");

            job.setJarByClass(GraphSearch.class);
            job.setMapperClass(DijkstraMap.class);
            job.setReducerClass(DijkstraReduce.class);

            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);

}

Где условие - глобальная переменная, которая изменяется во время/после каждой карты/уменьшает выполнение.

4b9b3361

Ответ 1

Каждый раз, когда вы запускаете работу по сокращению карты, вы можете проверить состояние вывода, значения, содержащиеся в счетчиках и т.д., и принять решение в node, которое контролирует итерацию, независимо от того, хотите ли вы больше итераций или нет. Наверное, я не понимаю, откуда возникает потребность в глобальном состоянии в вашем сценарии.

В более общем плане есть два основных способа совместного использования состояния между исполняющими узлами (хотя следует отметить, что лучше всего избегать состояния совместного использования, поскольку оно ограничивает масштабируемость).

  • Запишите файл в HDFS, который могут считывать другие узлы (убедитесь, что файл очищается при завершении задания и что спекулятивное выполнение не вызовет странных сбоев).
  • Используйте ZooKeeper для хранения некоторых данных в выделенных узлах дерева ZK.

Ответ 2

Вы можете использовать Configuration.set(String name, String value), чтобы установить значение, которое вы сможете получить в своих Mappers/Reducers/etc:

В вашем драйвере:

   conf.set("my.dijkstra.parameter", "value");

И, например, в вашем картографе:

public void configure(JobConf job) {
       myParam = job.get("my.dijkstra.parameter");
   }

Но это вряд ли поможет вам просмотреть результаты предыдущих заданий, чтобы решить, следует ли запускать еще одну итерацию. То есть это значение не будет возвращено после выполнения задания.

Вы также можете использовать Hadoop DistributedCache для хранения файлов, которые будут распределены между всеми узлами. Это немного лучше, чем просто хранить что-то на HDFS, если значение, которое вы собираетесь пройти таким образом, является чем-то маленьким.

Конечно, counters также может использоваться для этой цели. Но они не выглядят слишком надежными для принятия решений в алгоритме. Похоже, в некоторых случаях их можно увеличить дважды (если какая-то задача была выполнена более одного раза, например, в случае сбоя или спекулятивного исполнения) - я не уверен.

Ответ 3

Вот как это работает в Hadoop 2.0

В вашем драйвере:

 conf.set("my.dijkstra.parameter", "value");

И в вашем Mapper:

protected void setup(Context context) throws IOException,
            InterruptedException {
        Configuration conf = context.getConfiguration();

        strProp = conf.get("my.dijkstra.parameter");
        // and then you can use it
    }

Ответ 4

Вы можете использовать Cascading для организации нескольких заданий Hadoop. Укажите путь HDFS, в котором вы хотите сохранить глобальную переменную состояния и инициализировать фиктивным содержимым. На каждой итерации прочитайте текущее содержимое этого пути HDFS, удалите это содержимое, выполните любое количество шагов карты/уменьшения и, наконец, выполните глобальное сокращение, которое обновляет глобальную переменную состояния. В зависимости от характера вашей задачи вам может потребоваться отключить спекулятивное выполнение и разрешить много попыток.