не может копировать json - Потоки db Dynamo для красного смещения - программирование

не может копировать json - Потоки db Dynamo для красного смещения

Ниже приведен пример использования: я настроил enable Streams при создании DynamoDB с new and old Image Я создал Kinesis Firehose delivery stream с Destination as Redshift (Intermediate s3).

Из Dynamodb мой поток достигает Firhose и оттуда в Bucket как JSON (S3 Bucket -Gzip), приведенный ниже. Моя проблема в том, что я cannot COPY this JSON to redshift.

Вещи, которые я не могу получить:

    1. Не уверен, какой должен быть оператор Create table в Redshift
    1. Каким должен быть Синтаксис COPY в кирсисе Кинсиса.
    1. Как я должен использовать JsonPaths здесь. Kinesis Data firehouse настроен на возвращение только json в мое ведро s3.
    1. Как упомянуть Maniphest в команде COPY

JSON Нагрузка до S3 показана ниже:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": "x"
        },
        "d_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "z_id": {
            "N": "1267"
        },
        "last_end_trip_dttm": {
            "S": "11/08/2018 1:43:46 PM"
        },
        "land_ind": {
            "N": "1"
        },
        "s_ind": {
            "N": "1"
        },
        "status_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "case_ind": {
            "N": "1"
        },
        "last_po_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "violated_duration": {
            "N": "20"
        },
        "vehicle_id": {
            "S": "x011"
        },
        "longitude": {
            "N": "103.7818"
        },
        "file_status": {
            "S": "Trip_Start"
        },
        "unhired_duration": {
            "N": "10"
        },
        "eo_lat": {
            "N": "1.2345"
        },
        "reply_eo_ind": {
            "N": "1"
        },
        "license_ind": {
            "N": "0"
        },
        "indiscriminately_parked_ind": {
            "N": "0"
        },
        "eo_lng": {
            "N": "102.8978"
        },
        "officer_id": {
            "S": "[email protected]"
        },
        "case_status": {
            "N": "0"
        },
        "color_status_cd": {
            "N": "0"
        },
        "parking_id": {
            "N": "2345"
        },
        "ttr_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "deployed_ind": {
            "N": "1"
        },
        "status": {
            "S": "PI"
        }
    },
    "SequenceNumber": "1200000000000956615967",
    "SizeBytes": 570,
    "ApproximateCreationDateTime": 1535513040,
    "eventName": "INSERT"
}

Инструкция My Create table:

create table vehicle_status(
    heart_beat integer,
    cdc_id integer,
    latitude integer,   
    not_deployed_counter integer,
    reg_ind integer,
    operator varchar(10),
    d_dttm varchar(30),
    z_id integer,
    last_end_trip_dttm varchar(30),
    land_ind integer,
    s_ind integer,
    status_change_dttm varchar(30), 
    case_ind integer,
    last_po_change_dttm varchar(30),    
    violated_duration integer,
    vehicle_id varchar(8),
    longitude integer,  
    file_status varchar(30),
    unhired_duration integer,
    eo_lat integer,                     
    reply_eo_ind integer,
    license_ind integer,    
    indiscriminately_parked_ind integer,
    eo_lng integer,
    officer_id varchar(50),
    case_status integer,
    color_status_cd integer,
    parking_id integer,
    ttr_dttm varchar(30),
    deployed_ind varchar(3),
  status varchar(8));

И My Copy Statement (Вручную пытаться перенести это из Redshift):

COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) 
FROM 's3://<my-bucket>/2018/08/29/05/vehicle_status_change-2-2018-08-29-05-24-42-092c330b-e14a-4133-bf4a-5982f2e1f49e.gz' CREDENTIALS 'aws_iam_role=arn:aws:iam::<accountnum>:role/<RedshiftRole>' GZIP json 'auto';

Когда я пытаюсь выполнить описанную выше процедуру - я получаю Вставить записи - но все столбцы и строки имеют значение NULL.

Как я могу скопировать этот формат json в redhsift. Застряли здесь последние 3 дня. Любая помощь по этому поводу.

S3 Ведро:

Amazon S3/<My-bucket>/2018/08/29/05
Amazon S3/<My-bucket>/manifests/2018/08/29/05
4b9b3361

Ответ 1

Я не очень хорошо знаком с Amazon, но позвольте мне попытаться ответить на большинство ваших вопросов, чтобы вы могли двигаться дальше. Другие люди могут отредактировать этот ответ или дополнительные сведения. Спасибо!

Не уверен, какой должен быть оператор Create table в Redshift

У вашего оператора create table vehicle_status(...) нет проблем, хотя вы можете добавить distribution key sort key и encoding на основе вашего требования, подробнее здесь и здесь

Согласно документам AWS Kenesis, ваша таблица должна присутствовать в Redshift, поэтому вы можете подключиться к Redshift с помощью команды psql и запустить create statement вручную.

Каким должен быть Синтаксис COPY в кирсисе Кинсиса.

Синтаксис Copy останется таким же, как вы его запускаете через psql или firhose, к счастью, сценарий копирования, который вы придумали, работает без ошибок, я попробовал его в своем экземпляре с небольшой модификацией прямого предложения ключей AWS/SECRET а затем отлично работает, здесь я запустил sql который отлично работал и скопировал 1 запись данных в таблицу vehicle_status.

На самом деле ваша структура пути json сложна, поэтому json 'auto' не будет работать. Вот рабочая команда, я создал образец файла jsonpath для вас с 4 примерами полей, и вы могли бы следовать той же структуре, чтобы создать файл jsonpath со всеми точками данных.

 COPY vehicle_status (heart_beat, cdc_id, operator, status) FROM 's3://XXX/development/test_file.json' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://XXX/development/yourjsonpathfile';

И ваш json path file должен иметь контент, подобный приведенному ниже.

{
  "jsonpaths": [
    "$['NewImage']['heart_beat']['N']",
    "$['NewImage']['cdc_id']['N']",
    "$['NewImage']['operator']['S']",
    "$['NewImage']['status']['S']"
  ]
}

Я тестировал его, и он работает.

Как я должен использовать JsonPaths здесь. Kinesis Data firehouse настроен на возвращение только json в мое ведро s3.

Я использовал только ваши данные json и он работает, поэтому я не вижу здесь никакой проблемы.

Как упомянуть Maniphest в команде COPY

Это хороший вопрос, я мог бы попытаться объяснить это, надеюсь, здесь вы имеете в виду menifest.

Если вы видите команду над копией, она отлично работает для одного файла или пары файлов, но думаю, что у вас много файлов, здесь идет концепция menifest. Прямо из документов Amazon: "Вместо того, чтобы поставлять объектный путь для команды COPY, вы указываете имя текстового файла в формате JSON, в котором явно указаны файлы для загрузки".

Короче говоря, если вы хотите загружать несколько файлов в один снимок, что также предпочтительнее Redshift, вы можете создать простой menifest с json и предоставить его в команде copy.

{ "entries": [ {"url":"s3://mybucket-alpha/2013-10-04-custdata", "mandatory":true}, {"url":"s3://mybucket-alpha/2013-10-05-custdata", "mandatory":true},.... ] }

загрузите манифест на S3 и используйте его в своей команде копирования, как показано ниже.

 COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) FROM 's3://XXX/development/test.menifest' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://yourbucket/jsonpath' menifest;

Вот подробный справочник для манифеста.

Надеюсь, это даст вам некоторые идеи, как двигаться дальше, и если есть особая ошибка, которую вы видите, я был бы счастлив переориентироваться на ответ.