Я пытаюсь импортировать данные файла csv
в dynamodb.
пожалуйста, предоставьте мне предложение.
first_name last_name
sri ram
Rahul Dravid
JetPay Underwriter
Anil Kumar Gurram
Я пытаюсь импортировать данные файла csv
в dynamodb.
пожалуйста, предоставьте мне предложение.
first_name last_name
sri ram
Rahul Dravid
JetPay Underwriter
Anil Kumar Gurram
На каком языке вы хотите импортировать данные. Я просто пишу функцию в nodejs, которая может импортировать файл csv в таблицу dynamodb. Сначала он анализирует весь csv в массив, разбивает массив на куски (25), а затем batchWriteItem на таблицу.
Примечание. DynamoDB разрешает только 1-25 записей за раз в batchinsert. Поэтому нам нужно разбить наш массив на куски.
var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');
var csv_filename = "YOUR_CSV_FILENAME_WITH_ABSOLUTE_PATH";
rs = fs.createReadStream(csv_filename);
parser = parse({
columns : true,
delimiter : ','
}, function(err, data) {
var split_arrays = [], size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
ddb.batchWriteItem({
"TABLE_NAME" : item_data
}, {}, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
Вы можете использовать AWS Data Pipeline для таких вещей. Вы можете загрузить свой csv
файл на S3, а затем использовать конвейер данных для извлечения и заполнения таблицы DynamoDB. У них есть пошаговое руководство.
Как простой dev без perms для создания Data Pipeline, мне пришлось использовать этот javascript. Код Хасана Сидика был немного устаревшим, но это сработало для меня:
var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');
const AWS = require('aws-sdk');
const dynamodbDocClient = new AWS.DynamoDB({ region: "eu-west-1" });
var csv_filename = "./CSV.csv";
rs = fs.createReadStream(csv_filename);
parser = parse({
columns : true,
delimiter : ','
}, function(err, data) {
var split_arrays = [], size = 25;
while (data.length > 0) {
//split_arrays.push(data.splice(0, size));
let cur25 = data.splice(0, size)
let item_data = []
for (var i = cur25.length - 1; i >= 0; i--) {
const this_item = {
"PutRequest" : {
"Item": {
// your column names here will vary, but you'll need do define the type
"Title": {
"S": cur25[i].Title
},
"Col2": {
"N": cur25[i].Col2
},
"Col3": {
"N": cur25[i].Col3
}
}
}
};
item_data.push(this_item)
}
split_arrays.push(item_data);
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, (item_data, callback) => {
const params = {
RequestItems: {
"tagPerformance" : item_data
}
}
dynamodbDocClient.batchWriteItem(params, function(err, res, cap) {
if (err === null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, () => {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
Вот мое решение. Я полагался на то, что был какой-то заголовок, указывающий, какой столбец сделал что. Простой и прямой. Отсутствие пропусков для быстрой загрузки.
import os, json, csv, yaml, time
from tqdm import tqdm
# For Database
import boto3
# Variable store
environment = {}
# Environment variables
with open("../env.yml", 'r') as stream:
try:
environment = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)
# Get the service resource.
dynamodb = boto3.resource('dynamodb',
aws_access_key_id=environment['AWS_ACCESS_KEY'],
aws_secret_access_key=environment['AWS_SECRET_KEY'],
region_name=environment['AWS_REGION_NAME'])
# Instantiate a table resource object without actually
# creating a DynamoDB table. Note that the attributes of this table
# are lazy-loaded: a request is not made nor are the attribute
# values populated until the attributes
# on the table resource are accessed or its load() method is called.
table = dynamodb.Table('data')
# Header
header = []
# Open CSV
with open('export.csv') as csvfile:
reader = csv.reader(csvfile,delimiter=',')
# Parse Each Line
with table.batch_writer() as batch:
for index,row in enumerate(tqdm(reader)):
if index == 0:
#save the header to be used as the keys
header = row
else:
if row == "":
continue
# Create JSON Object
# Push to DynamoDB
data = {}
# Iterate over each column
for index,entry in enumerate(header):
data[entry.lower()] = row[index]
response = batch.put_item(
Item=data
)
# Repeat
Я рекомендую использовать локальную версию DynamoDB, на случай, если вы захотите проверить это, прежде чем начинать платить, а что нет. Я сделал несколько небольших изменений, прежде чем опубликовать это, так что не забудьте проверить с любыми средствами, которые имеют смысл для вас. Я закомментировал фиктивное задание на пакетную загрузку, которое вы можете использовать вместо любой службы DynamoDB, удаленной или локальной, чтобы проверить в stdout, что это работает в соответствии с вашими потребностями.
Смотрите Dynamodb-локальный на npmjs или ручной установки
Если вы пошли по пути ручной установки, вы можете запустить DynamodB-Local примерно так:
java -Djava.library.path=<PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal_lib/\
-jar <PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal.jar\
-inMemory\
-sharedDb
Маршрут npm может быть проще.
Наряду с этим, смотрите Dynamodb-Admin.
Я установил DynamodB-Admin с помощью npm я -g dynamodb-admin
. Затем он может быть запущен с:
dynamodb-admin
DynamoDB-локальный по умолчанию localhost:8000
.
Dynamodb-admin - это веб-страница, по умолчанию localhost:8001
. Как только вы запустите эти две службы, откройте localhost:8001
в вашем браузере, чтобы просмотреть и управлять базой данных.
Сценарий ниже не создает базу данных. Для этого используйте dynamodb-admin
.
maxSockets
конфигурации maxSockets
и изменив код, который создает массивы пакетов, чтобы он maxSockets
только отдельные пакеты вперед./**
* Uploads CSV data to DynamoDB.
*
* 1. Streams a CSV file line-by-line.
* 2. Parses each line to a JSON object.
* 3. Collects batches of JSON objects.
* 4. Converts batches into the PutRequest format needed by AWS.DynamoDB.batchWriteItem
* and runs 1 or more batches at a time.
*/
const AWS = require("aws-sdk")
const chalk = require('chalk')
const fs = require('fs')
const split = require('split2')
const uuid = require('uuid')
const through2 = require('through2')
const { Writable } = require('stream');
const { Transform } = require('stream');
const CSV_FILE_PATH = __dirname + "/../assets/whatever.csv"
// A whitelist of the CSV columns to ingest.
const CSV_KEYS = [
"id",
"name",
"city"
]
// Inadequate WCU will cause "insufficient throughput" exceptions, which in this script are not currently
// handled with retry attempts. Retries are not necessary as long as you consistently
// stay under the WCU, which isn't that hard to predict.
// The number of records to pass to AWS.DynamoDB.DocumentClient.batchWrite
// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
const MAX_RECORDS_PER_BATCH = 25
// The number of batches to upload concurrently.
// https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/node-configuring-maxsockets.html
const MAX_CONCURRENT_BATCHES = 1
// MAKE SURE TO LAUNCH 'dynamodb-local' EXTERNALLY FIRST IF USING LOCALHOST!
AWS.config.update({
region: "us-west-1"
,endpoint: "http://localhost:8000" // Comment out to hit live DynamoDB service.
});
const db = new AWS.DynamoDB()
// Create a file line reader.
var fileReaderStream = fs.createReadStream(CSV_FILE_PATH)
var lineReaderStream = fileReaderStream.pipe(split())
var linesRead = 0
// Attach a stream that transforms text lines into JSON objects.
var skipHeader = true
var csvParserStream = lineReaderStream.pipe(
through2(
{
objectMode: true,
highWaterMark: 1
},
function handleWrite(chunk, encoding, callback) {
// ignore CSV header
if (skipHeader) {
skipHeader = false
callback()
return
}
linesRead++
// transform line into stringified JSON
const values = chunk.toString().split(',')
const ret = {}
CSV_KEYS.forEach((keyName, index) => {
ret[keyName] = values[index]
})
ret.line = linesRead
console.log(chalk.cyan.bold("csvParserStream:",
"line:", linesRead + ".",
chunk.length, "bytes.",
ret.id
))
callback(null, ret)
}
)
)
// Attach a stream that collects incoming json lines to create batches.
// Outputs an array (<= MAX_CONCURRENT_BATCHES) of arrays (<= MAX_RECORDS_PER_BATCH).
var batchingStream = (function batchObjectsIntoGroups(source) {
var batchBuffer = []
var idx = 0
var batchingStream = source.pipe(
through2.obj(
{
objectMode: true,
writableObjectMode: true,
highWaterMark: 1
},
function handleWrite(item, encoding, callback) {
var batchIdx = Math.floor(idx / MAX_RECORDS_PER_BATCH)
if (idx % MAX_RECORDS_PER_BATCH == 0 && batchIdx < MAX_CONCURRENT_BATCHES) {
batchBuffer.push([])
}
batchBuffer[batchIdx].push(item)
if (MAX_CONCURRENT_BATCHES == batchBuffer.length &&
MAX_RECORDS_PER_BATCH == batchBuffer[MAX_CONCURRENT_BATCHES-1].length)
{
this.push(batchBuffer)
batchBuffer = []
idx = 0
} else {
idx++
}
callback()
},
function handleFlush(callback) {
if (batchBuffer.length) {
this.push(batchBuffer)
}
callback()
}
)
)
return (batchingStream);
})(csvParserStream)
// Attach a stream that transforms batch buffers to collections of DynamoDB batchWrite jobs.
var databaseStream = new Writable({
objectMode: true,
highWaterMark: 1,
write(batchBuffer, encoding, callback) {
console.log(chalk.yellow('Batch being processed.'))
// Create 'batchBuffer.length' batchWrite jobs.
var jobs = batchBuffer.map(batch =>
buildBatchWriteJob(batch)
)
// Run multiple batch-write jobs concurrently.
Promise
.all(jobs)
.then(results => {
console.log(chalk.bold.red('${batchBuffer.length} batches completed.'))
})
.catch(error => {
console.log( chalk.red( "ERROR" ), error )
callback(error)
})
.then( () => {
console.log( chalk.bold.red("Resuming file input.") )
setTimeout(callback, 900) // slow down the uploads. calculate this based on WCU, item size, batch size, and concurrency level.
})
// return false
}
})
batchingStream.pipe(databaseStream)
// Builds a batch-write job that runs as an async promise.
function buildBatchWriteJob(batch) {
let params = buildRequestParams(batch)
// This was being used temporarily prior to hooking up the script to any dynamo service.
// let fakeJob = new Promise( (resolve, reject) => {
// console.log(chalk.green.bold( "Would upload batch:",
// pluckValues(batch, "line")
// ))
// let t0 = new Date().getTime()
// // fake timing
// setTimeout(function() {
// console.log(chalk.dim.yellow.italic('Batch upload time: ${new Date().getTime() - t0}ms'))
// resolve()
// }, 300)
// })
// return fakeJob
let promise = new Promise(
function(resolve, reject) {
let t0 = new Date().getTime()
let printItems = function(msg, items) {
console.log(chalk.green.bold(msg, pluckValues(batch, "id")))
}
let processItemsCallback = function (err, data) {
if (err) {
console.error('Failed at batch: ${pluckValues(batch, "line")}, ${pluckValues(batch, "id")}')
console.error("Error:", err)
reject()
} else {
var params = {}
params.RequestItems = data.UnprocessedItems
var numUnprocessed = Object.keys(params.RequestItems).length
if (numUnprocessed != 0) {
console.log('Encountered ${numUnprocessed}')
printItems("Retrying unprocessed items:", params)
db.batchWriteItem(params, processItemsCallback)
} else {
console.log(chalk.dim.yellow.italic('Batch upload time: ${new Date().getTime() - t0}ms'))
resolve()
}
}
}
db.batchWriteItem(params, processItemsCallback)
}
)
return (promise)
}
// Build request payload for the batchWrite
function buildRequestParams(batch) {
var params = {
RequestItems: {}
}
params.RequestItems.Provider = batch.map(obj => {
let item = {}
CSV_KEYS.forEach((keyName, index) => {
if (obj[keyName] && obj[keyName].length > 0) {
item[keyName] = { "S": obj[keyName] }
}
})
return {
PutRequest: {
Item: item
}
}
})
return params
}
function pluckValues(batch, fieldName) {
var values = batch.map(item => {
return (item[fieldName])
})
return (values)
}
Вы можете попробовать использовать пакетную запись и многопроцессорную обработку для ускорения массового импорта.
import csv
import time
import boto3
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
current_milli_time = lambda: int(round(time.time() * 1000))
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table_name')
def add_users_in_batch(data):
with table.batch_writer() as batch:
for item in data:
batch.put_item(Item = item)
def run_batch_migration():
start = current_milli_time()
row_count = 0
batch = []
batches = []
with open(CSV_PATH, newline = '') as csvfile:
reader = csv.reader(csvfile, delimiter = '\t', quotechar = '|')
for row in reader:
row_count += 1
item = {
'email': row[0],
'country': row[1]
}
batch.append(item)
if row_count % 25 == 0:
batches.append(batch)
batch = []
batches.append(batch)
pool.map(add_users_in_batch, batches)
print('Number of rows processed - ', str(row_count))
end = current_milli_time()
print('Total time taken for migration : ', str((end - start) / 1000), ' secs')
if __name__ == "__main__":
run_batch_migration()
Другой быстрый обходной путь - сначала загрузить CSV в RDS или любой другой экземпляр mysql, что довольно легко сделать (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Introduction.html), а затем использовать DMS. (Служба миграции баз данных AWS) для загрузки всех данных в DynamodB. Вам нужно будет создать роль для DMS, прежде чем вы сможете загрузить данные. Но это прекрасно работает без запуска каких-либо сценариев.
Обновлен код Javascript 2019 года
Мне не повезло ни с одним из приведенных выше примеров кода Javascript. Начиная с ответа Хасана Сиддика, приведенного выше, я обновил до последней версии API, включил пример кода учетных данных, переместил всю пользовательскую конфигурацию наверх, добавил uuid() при отсутствии и удалил пустые строки.
const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');
const uuid = require('uuid/v4');
const AWS = require('aws-sdk');
// --- start user config ---
const AWS_CREDENTIALS_PROFILE = 'serverless-admin';
const CSV_FILENAME = "./majou.csv";
const DYNAMODB_REGION = 'eu-central-1';
const DYNAMODB_TABLENAME = 'entriesTable';
// --- end user config ---
const credentials = new AWS.SharedIniFileCredentials({
profile: AWS_CREDENTIALS_PROFILE
});
AWS.config.credentials = credentials;
const docClient = new AWS.DynamoDB.DocumentClient({
region: DYNAMODB_REGION
});
const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
columns: true,
delimiter: ','
}, function(err, data) {
var split_arrays = [],
size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
const params = {
RequestItems: {}
};
params.RequestItems[DYNAMODB_TABLENAME] = [];
item_data.forEach(item => {
for (key of Object.keys(item)) {
// An AttributeValue may not contain an empty string
if (item[key] === '')
delete item[key];
}
params.RequestItems[DYNAMODB_TABLENAME].push({
PutRequest: {
Item: {
id: uuid(),
...item
}
}
});
});
docClient.batchWrite(params, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
Я создал драгоценный камень для этого.
Теперь вы можете установить его, запустив gem install dynamocli
, затем вы можете использовать команду:
dynamocli import your_data.csv --to your_table
Вот ссылка на исходный код: https://github.com/matheussilvasantos/dynamocli