Подтвердить что ты не робот

Как эффективно перемещать данные с Kafka на таблицу Impala?

Вот шаги к текущему процессу:

  • Flafka записывает журналы в "зону приземления" на HDFS.
  • Задание, запланированное Oozie, копирует полные файлы из зоны приземления в промежуточную область.
  • Данные промежуточного этапа являются "schema-ified" таблицей Hive, которая использует промежуточную область в качестве своего местоположения.
  • Записи из промежуточной таблицы добавляются в постоянную таблицу Hive (например, insert into permanent_table select * from staging_table).
  • Данные из таблицы Hive доступны в Impala, выполнив refresh permanent_table в Impala.

существующий поток данных

Я смотрю на процесс, который я создал, и он "плохо пахнет": слишком много промежуточных шагов, которые ухудшают поток данных.

Около 20 месяцев назад я увидел демоверсию, в которой данные транслировались из трубки Amazon Kinesis, и была доступна в Impala в почти реальном времени. Я не думаю, что они сделали что-то довольно уродливое/запутанное. Есть ли более эффективный способ передачи данных от Kafka в Impala (возможно, потребитель Kafka, который может сериализоваться в Parquet)?

Я предполагаю, что "потоковая передача данных в SQL с низкой задержкой" должна быть довольно распространенным вариантом использования, и поэтому мне интересно узнать, как другие люди решили эту проблему.

4b9b3361

Ответ 1

Если вам нужно сбрасывать данные Kafka как есть в HDFS, лучшим вариантом является использование соединителя Kafka Connect и Confluent HDFS.

Вы можете либо сбросить данные в файл паркета на HDFS, который вы можете загрузить в Impala. Вам нужно, я думаю, вы захотите использовать секвенсор TimeBasedPartitioner для создания паркетных файлов каждые X миллисекунд (настройка параметра конфигурации partition.duration.ms).

Добавьте что-то вроде этого в конфигурацию Kafka Connect, чтобы сделать трюк:

# Don't flush less than 1000 messages to HDFS
flush.size = 1000 

# Dump to parquet files   

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

partitioner.class = TimebasedPartitioner

# One file every hour. If you change this, remember to change the filename format to reflect this change
partition.duration.ms = 3600000
# Filename format
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm