Подтвердить что ты не робот

Как загрузить данные навалом в хранилище приложений? Старые методы не работают

Это должно быть довольно распространенным требованием и простым процессом: загрузка данных навалом в хранилище данных appengine.

Однако ни одно из старых решений, упомянутых в stackoverflow (ссылки ниже *), похоже, больше не работает. Метод bulkloader, который был самым разумным решением при загрузке в хранилище данных с использованием API DB, не работает с API NDB

И теперь метод bulkloader, кажется, устарел, а старые ссылки, которые все еще присутствуют в документах, приводят к неправильной странице. Вот пример

https://developers.google.com/appengine/docs/python/tools/uploadingdata

Эта ссылка выше по-прежнему присутствует на этой странице: https://developers.google.com/appengine/docs/python/tools/uploadinganapp

Каков рекомендуемый метод для одновременной загрузки данных?

Двумя возможными альтернативами кажутся 1) использование remote_api или 2) запись CSV файла в ведро GCS и чтение из него. У кого-нибудь есть опыт успешного использования любого метода?

Любые указатели будут очень благодарны. Спасибо!

[* Решения, предлагаемые по приведенным ниже ссылкам, более не действительны]

[1] Как отправить данные навалом в хранилище Google appengine?

[2] Как вставить массовые данные в хранилище данных Google App Engine?

4b9b3361

Ответ 1

Некоторые из вас могут быть в моей ситуации: я не могу использовать утилиту импорта/экспорта хранилища данных, потому что мои данные должны быть преобразованы, прежде чем попасть в хранилище данных.

В итоге я использовал apache-beam (поток данных Google Cloud).

Вам нужно всего лишь написать несколько строк кода "луч", чтобы

  • прочитайте свои данные (например, размещены в облачном хранилище) - вы получаете PCollection строк,
  • выполняйте любое преобразование, которое вы хотите (так что вы получаете PCollection объектов хранилища данных),
  • сбрасывать их в раковину хранилища данных.

См. Раздел Как ускорить массовый импорт в облачный хранилище Google с несколькими рабочими? для конкретного варианта использования.

Я мог писать со скоростью 800 единиц в секунду в моем хранилище данных с 5 рабочими. Это позволило мне завершить задачу импорта (с 16 миллионами строк) примерно через 5 часов. Если вы хотите сделать это быстрее, используйте больше работников: D

Ответ 2

Способ 1: Использовать remote_api

Как: напишите файл bulkloader.yaml и запустите его напрямую, используя команду appcfg.py upload_data из терминала Я не рекомендую этот метод по нескольким причинам: 1. огромная латентность 2. отсутствие поддержки NDB

Метод 2: GCS и использование mapreduce

Загрузка файла данных в GCS:

Используйте " storage-file-transfer-json-python" проект github (chunked_transfer.py) для загрузки файлов в gcs из вашей локальной системы. Обязательно создайте файл "client-secrets.json" из консоли администратора приложения.

MapReduce:

Используйте проект appengine-mapreduce "github. Скопируйте папку" mapreduce" в папку верхнего уровня вашего проекта.

Добавьте строку ниже в файл app.yaml:

includes:
  - mapreduce/include.yaml

Ниже приведен ваш файл main.py

import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader

def testmapperFunc(newRequest):
    f = StringIO.StringIO(newRequest)
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        newEntry = DataStoreModel(attr1=row[0], link=row[1])
        yield op.db.Put(newEntry)

class TestGCSReaderPipeline(base_handler.PipelineBase):
    def run(self, filename):
        yield mapreduce_pipeline.MapreducePipeline(
                "test_gcs",
                "testgcs.testmapperFunc",
                "mapreduce.input_readers.FileInputReader",
                mapper_params={
                    "files": [filename],
                    "format": 'lines'
                },
                shards=1)

class tempTestRequestGCSUpload(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())

        bucket = '/gs/' + bucket_name
        filename = bucket + '/' + 'tempfile.csv'

        pipeline = TestGCSReaderPipeline(filename)
        pipeline.with_params(target="mapreducetestmodtest")
        pipeline.start()
        self.response.out.write('done')

application = webapp2.WSGIApplication([
    ('/gcsupload', tempTestRequestGCSUpload),
], debug=True)

Чтобы помнить:

  • Проект Mapreduce использует устаревший "API файлов облачных хранилищ Google". Поэтому поддержка в будущем не гарантируется.
  • Снижение карты добавляет небольшие накладные расходы на чтение и запись хранилища данных.

Метод 3: Клиентская библиотека GCS и GCS

  • Загрузите файл csv/text в gcs, используя вышеупомянутый метод передачи файлов.
  • Используйте библиотеку клиентов gcs (скопируйте папку "cloudstorage" в папку верхнего уровня приложения).

Добавьте приведенный ниже код в файл main.py приложения.

import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel

class UploadGCSData(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())
        bucket = '/' + bucket_name
        filename = bucket + '/tempfile.csv'
        self.upload_file(filename)

    def upload_file(self, filename):
        gcs_file = gcs.open(filename)
        datareader = csv.reader(gcs_file)
        count = 0
        entities = []
        for row in datareader:
            count += 1
                newProd = DataStoreModel(attr1=row[0], link=row[1])
                entities.append(newProd)

            if count%50==0 and entities:
                ndb.put_multi(entities)
                entities=[]

        if entities:
            ndb.put_multi(entities)

application = webapp2.WSGIApplication([
    ('/gcsupload', UploadGCSData),
], debug=True)

Ответ 3

Удаленный API-метод, как показано в вашей ссылке [1], все еще работает нормально - хотя он очень медленный, если у вас более нескольких сотен строк.

Я успешно использовал GCS совместно с картой MapReduce, чтобы загружать, а не загружать содержимое хранилища данных, но принципы должны быть одинаковыми. См. Документацию mapreduce: на самом деле вам нужен только шаг mapper, поэтому вы можете определить простую функцию, которая принимает строку из вашего CSV и создает datastore из этих данных.