Подтвердить что ты не робот

Кластер кластеров vs order by vs sort by

Насколько я понимаю;

  • сортировать только в редукторе

  • порядок по порядку вещей по всему миру, но сует все в один редукторы

  • Кластер по интеллектуально распределяет вещи в редукторы по ключевому хешу и делает сортировку по

Таким образом, мой вопрос - гарантирует ли кластер глобальный порядок? Распределить путем помещает те же ключи в те же редукторы, но как насчет смежных ключей?

Единственный документ, который я могу найти по этому вопросу, находится здесь, и из примера кажется, что он заказывает их по всему миру. Но из определения я чувствую, что это не всегда так.

4b9b3361

Ответ 1

Краткий ответ: да, CLUSTER BY гарантирует глобальный порядок, если вы готовы присоединиться к нескольким выходным файлам самостоятельно.

Более длинная версия:

  • ORDER BY x: гарантирует глобальный порядок, но делает это путем проталкивания всех данных через один редуктор. Это в принципе неприемлемо для больших наборов данных. В результате вы получите один отсортированный файл.
  • SORT BY x: упорядочивает данные в каждом из N редукторов, но каждый редуктор может получать перекрывающиеся диапазоны данных. В итоге вы получите N или более отсортированных файлов с перекрывающимися диапазонами.
  • DISTRIBUTE BY x: гарантирует, что каждый из N редукторов получит непересекающиеся диапазоны x, но не сортирует выходные данные каждого редуктора. Вы получите N или более несортированных файлов с неперекрывающимися диапазонами.
  • CLUSTER BY x: гарантирует, что каждый из N редукторов получит непересекающиеся диапазоны, а затем сортирует по этим диапазонам в редукторах. Это дает вам глобальное упорядочение, и то же самое, что и при выполнении (DISTRIBUTE BY x и SORT BY x). Вы получите N или более отсортированных файлов с неперекрывающимися диапазонами.

Есть смысл? Таким образом, CLUSTER BY является в основном более масштабируемой версией ORDER BY.

Ответ 2

Позвольте мне пояснить в первую очередь: clustered by распределяет ваши ключи только в разных ведрах, clustered by ... sorted by сортирует сортировки.

С помощью простого эксперимента (см. ниже) вы можете увидеть, что по умолчанию вы не получите глобальный порядок. Причина в том, что по умолчанию разделитель разделяет ключи с использованием хеш-кодов независимо от фактического заказа клавиш.

Однако вы можете полностью упорядочить свои данные.

Мотивация - "Hadoop: окончательное руководство" Тома Уайта (3-е издание, глава 8, стр. 274, "Всего сортировка" ), где он обсуждает TotalOrderPartitioner.

Сначала я отвечу на ваш вопрос TotalOrdering, а затем опишу несколько экспериментов по улусу, связанных с сортировкой, которые я сделал.

Имейте в виду: то, что я описываю здесь, является "доказательством концепции", я смог обработать один пример, используя дистрибутив Claudera CDH3.

Изначально я надеялся, что org.apache.hadoop.mapred.lib.TotalOrderPartitioner сделает трюк. К сожалению, это не так, потому что это похоже на разделение Hive по значению, а не на ключ. Поэтому я исправляю его (должен иметь подкласс, но у меня нет на это времени):

Заменить

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

с

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

Теперь вы можете установить (исправленный) TotalOrderPartitioner как ваш обозреватель кустов:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

Я также использовал

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

в моих тестах.

Файл out_data2 сообщает TotalOrderPartitioner, как значения bucket. Вы генерируете out_data2 путем отбора ваших данных. В моих тестах я использовал 4 ведра и ключи от 0 до 10. Я генерировал out_data2 с использованием ad-hoc-подхода:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

Затем я скопировал результат out_data2 в HDFS (в/user/yevgen/out_data2)

С этими настройками я получил свои данные в bucketed/sorted (см. последний пункт в моем списке экспериментов).

Вот мои эксперименты.

  • Создание типовых данных

    bash > echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

  • Создайте базовую тестовую таблицу:

    hive > create table test (x int); hive > загрузить данные local inpath 'data.txt' в таблицу test;

В основном эта таблица содержит значения от 0 до 9 без порядка.

  • Продемонстрируйте, как работает копирование таблицы (действительно параметр mapred.reduce.tasks, который устанавливает максимальное количество сокращаемых задач MAXIMAL)

    hive > создать таблицу test2 (x int);

    hive > set mapred.reduce.tasks = 4;

    hive > вставить таблицу перезаписи test2 выберите a.x из теста a тест присоединения b на a.x = b.x; - глупое объединение, чтобы заставить нетривиальное сокращение карты

    bash > hasoop fs -cat/user/hive/warehouse/test2/000001_0

    1

    5

    9

  • Демонстрируйте балансировку. Вы можете видеть, что ключи выбираются случайным образом без какого-либо порядка сортировки:

    hive > создать таблицу test3 (x int) сгруппированы по (x) в 4 ведра;

    hive > set hive.enforce.bucketing = true;

    hive > вставить таблицу перезаписи test3 выберите * из теста;

    bash > hadoop fs -cat/user/hive/warehouse/test3/000000_0

    4

    -

    0

  • Букинг с сортировкой. Результаты частично отсортированы, а не полностью отсортированы.

    hive > создать таблицу test4 (x int) сгруппированные по (x), отсортированные по (x desc) в 4 ведра;

    hive > вставить таблицу перезаписи test4 выберите * из теста;

    bash > hasoop fs -cat/user/hive/warehouse/test4/000001_0

    1

    5

    9

Вы можете видеть, что значения отсортированы в порядке возрастания. Похож на ошибку Hive в CDH3?

  • Частично отсортировано без указания кластера:

    hive > создать таблицу test5 как выберите x от испытания распределять по х sort by x desc;

    bash > hasoop fs -cat/user/hive/warehouse/test5/000001_0

    9

    5

    1

  • Используйте мой исправленный TotalOrderParitioner:

    hive > set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    hive > set total.order.partitioner.natural.order = false

    hive > set total.order.partitioner.path =/user/training/out_data2

    hive > создать таблицу test6 (x int) сгруппированные по (x), отсортированные по (x) на 4 ковши,

    hive > вставить таблицу перезаписи test6 выберите * из теста;

    bash > hasoop fs -cat/user/hive/warehouse/test6/000000_0

    1

    2

    0

    bash > hasoop fs -cat/user/hive/warehouse/test6/000001_0

    3

    4

    5

    bash > hasoop fs -cat/user/hive/warehouse/test6/000002_0

    7

    6

    -

    bash > hasoop fs -cat/user/hive/warehouse/test6/000003_0

    9

Ответ 3

Как я понимаю, короткий ответ - нет. Вы получите перекрывающиеся диапазоны.

Из документации SortBy: "Cluster By - это сокращение как для Distribute By, так и для Sort By". "Все строки с одинаковыми столбцами Распределить по будут идти к одному и тому же редуктору". Но нет информации, которая распространялась бы по гарантии непересекающихся диапазонов.

Более того, из документации DDL BucketedTables: "Как Hive распределяет строки по сегментам? В общем, номер сегмента определяется выражением hash_function (bucketing_column) mod num_buckets". Я предполагаю, что Cluster by в операторе Select использует тот же принцип для распределения строк между редукторами, потому что он используется главным образом для заполнения данных таблицами с интервалом.

Я создал таблицу с 1 целочисленным столбцом "а" и вставил туда цифры от 0 до 9.

Затем я установил число редукторов на 2 set mapred.reduce.tasks = 2; ,

И select данные из этой таблицы с помощью Cluster by clause select * from my_tab cluster by a;

И получил результат, который я ожидал:

0
2
4
6
8
1
3
5
7
9

Итак, первый редуктор (число 0) получил четные числа (потому что их режим 2 дает 0)

а второй редуктор (номер 1) получил нечетные числа (потому что их режим 2 дает 1)

Так что, как работает "Распределить по".

А затем "Сортировка по" сортирует результаты внутри каждого редуктора.

Ответ 4

CLUSTER BY не создает глобальное упорядочение.

Принятый ответ (Ларс Йенкен) вводит в заблуждение, заявляя, что редукторы получат неперекрывающиеся диапазоны. Поскольку Антон Завирюхин правильно указывает на документацию BucketedTables, CLUSTER BY в основном РАСПРОСТРАНЯЕТСЯ (так же, как bucketing) плюс SORT BY в каждом ковше/редукторе. И РАСПРОСТРАНЯЕТСЯ просто хэши и моды в ведра, а в то время как хеширующая функция может сохранить порядок (хеш i > хэш j, если i > j) mod hash value does not.

Здесь лучший пример, показывающий перекрывающиеся диапазоны

http://myitlearnings.com/bucketing-in-hive/

Ответ 5

Кластер на каждую сортировку редуктора не является глобальным. Во многих книгах также упоминается неправильно или смутно. Это особенно полезно, когда вы говорите, что вы распределяете каждый отдел на конкретный редуктор, а затем сортируете по имени сотрудника в каждом отделе и не заботитесь о том, чтобы какой-либо блок не использовался, и он больше выполняет ant по мере распределения рабочей нагрузки среди редукторов.

Ответ 6

SortBy: N или более отсортированных файлов с перекрывающимися диапазонами.

OrderBy: один выход, т.е. полностью упорядоченный.

Распределить по: Распределить За счет защиты каждый из N редукторов получает непересекающиеся диапазоны столбца, но не сортирует выходные данные каждого редуктора.

Для получения дополнительной информации http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

ClusterBy: Обратитесь к тому же примеру, что и выше, если мы используем Cluster By x, два редуктора будут дополнительно сортировать строки по x: