Подтвердить что ты не робот

Как бы вы предложили выполнить "Присоединиться" к потоку Hadoop?

У меня есть два файла в следующих форматах:

field1, field2, field3
field4, field1, field5

Другой номер поля указывает другое значение.

Я хочу присоединиться к двум файлам, используя Hadoop Streaming на основе взаимного поля (field1 в приведенном выше примере), поэтому вывод будет field1, field2, field3, field4, field5 (другие упорядочения в порядке, так же как и все поля).

4b9b3361

Ответ 1

Hadoop имеет библиотеку под названием KeyFieldBasedPartitioner http://hadoop.apache.org/mapreduce/docs/r0.21.0/api/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.html

Используя это как опцию при запуске задания в качестве разделителя для вашего потокового задания, вы можете разбить выход вашего картографа на пары "ключ/значение", а ключи будут хэшироваться вместе, переходя к одному и тому же редуктору и сортировке, включая значения http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#More+Usage+Examples

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D mapreduce.map.output.key.field.separator=. \
-D mapreduce.partition.keypartitioner.options=-k1,2 \
-D mapreduce.job.reduces=12 \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

Здесь, -D stream.map.output.field.separator =. и -D stream.num.map.output.key.fields = 4 объясняются здесь http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Customizing+How+Lines+are+Split+into+Key%2FValue+Pairs, в основном это то, как вы вывели свой картограф поля для определения пар ключ/значение.

Ключи вывода карты вышеуказанного задания MapReduce обычно имеют четыре поля, разделенные символом ".". Однако структура MapReduce будет разбивать выходы карты на первые два поля ключей, используя параметр -D mapreduce.partition.keypartitioner.options = -k1,2. Здесь, -D mapreduce.map.output.key.field.separator =. определяет разделитель для раздела. Это гарантирует, что все пары ключ/значение с теми же первыми двумя полями в ключах будут разделены на один и тот же редуктор.

Это фактически эквивалентно заданию первых двух полей в качестве первичного ключа и следующих двух полей в качестве вторичного. Первичный ключ используется для разбиения на разделы, и для сортировки используется комбинация первичного и вторичного ключей.

Для того, чтобы сделать соединение, это так же просто, как вывод полей из вашего картографа и установка параметров запуска вашей конфигурации для полей, которые являются ключами, а редуктор будет иметь все ваши значения, соединенные ключом соответствующим образом. Если вы хотите брать данные из нескольких источников, просто добавляйте больше -input в командной строке... если они разные длины ввода, а затем в вашем картографе вы можете распознать это и создать стандартный формат вывода из mapper.

Ответ 2

Cascading предоставляет полезные абстракции для фильтрации, объединения и группировки по полю. fooobar.com/info/468338/... ссылки на полезный пример использования потоковой передачи Hadoop в каскадировании.