Подтвердить что ты не робот

Лучшая практика для запроса большого количества объектов ndb из хранилища данных

Я столкнулся с интересным лимитом с хранилищем данных App Engine. Я создаю обработчик, чтобы помочь нам проанализировать некоторые данные об использовании на одном из наших производственных серверов. Чтобы выполнить анализ, мне нужно запросить и суммировать 10 000+ объектов, извлеченных из хранилища данных. Расчет не является трудным, это всего лишь гистограмма элементов, которые передают конкретный фильтр образцов использования. Проблема, с которой я сталкиваюсь, заключается в том, что я не могу получить данные обратно из хранилища достаточно быстро, чтобы выполнить какую-либо обработку, прежде чем нажать на крайний срок запроса.

Я пробовал все, что я могу придумать, чтобы разделить запрос на параллельные вызовы RPC для повышения производительности, но, согласно appstats, я не могу заставить запросы фактически выполнять параллельно. Независимо от того, какой метод я пытаюсь (см. Ниже), всегда кажется, что RPC возвращается к водопаду последовательных последующих запросов.

Примечание: код запроса и анализа работает, он работает медленно, потому что я не могу получить данные достаточно быстро из хранилища данных.

Фон

У меня нет живой версии, которую я могу поделиться, но вот базовая модель для той части системы, о которой я говорю:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

Вы можете рассматривать образцы как раз, когда пользователь использует возможность имени. (например: 'systemA.feature_x'). Теги основаны на деталях клиента, системной информации и функции. ex: ['winxp', '2.5.1', ​​'systemA', 'feature_x', 'premium_account']). Таким образом, теги образуют денормализованный набор токенов, которые можно использовать для поиска интересующих образцов.

Анализ, который я пытаюсь сделать, состоит в том, чтобы использовать диапазон дат и задавать, сколько раз было функцией набора функций (возможно, всех функций), используемых в день (или в час) на учетную запись клиента (компания, а не на пользователя).

Таким образом, вход в обработчик будет выглядеть следующим образом:

  • Дата начала
  • Конечная дата
  • Tag (s)

Вывод:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Общий код для запросов

Вот несколько общих кодов для всех запросов. Общая структура обработчика - простой обработчик get using webapp2, который настраивает параметры запроса, запускает запрос, обрабатывает результаты, создает данные для возврата.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Пробы методов

Я попробовал различные методы, чтобы попытаться извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:

A. Одиночная итерация

Это более простой базовый случай для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb делать то, что он делает, чтобы вытаскивать их один за другим.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

В. Большая выборка

Идея здесь состояла в том, чтобы увидеть, могу ли я сделать одну очень большую выборку.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

С. Async выбирает диапазон времени

Идея здесь состоит в том, чтобы распознать, что образцы достаточно хорошо распределены во времени, поэтому я могу создать набор независимых запросов, которые разбивают общую область времени на куски и пытаются запустить каждую из них параллельно с помощью async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Асинхронное отображение

Я попробовал этот метод, потому что в документации это звучало как ndb, может использовать некоторое parallelism автоматически при использовании метода Query.map_async.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Результат

Я проверил один пример запроса, чтобы собрать общее время отклика и трассировки appstats. Результаты:

A. Одиночная итерация

real: 15.645s

Эта последовательность последовательно проходит через выборку пакетов один за другим, а затем извлекает каждый сеанс из memcache.

Method A appstats

В. Большая выборка

real: 12.12s

Эффективно то же, что и опция A, но по какой-то причине немного быстрее.

Method B appstats

С. Async выбирает диапазон времени

real: 15.251s

Появляется, чтобы предоставить больше parallelism в начале, но, похоже, замедлится последовательностью вызовов к следующей во время итерации результатов. Также не похоже, что они могут перекрывать запросы memcache сеанса с ожидающими запросами.

Method C appstats

D. Асинхронное отображение

real: 13.752s

Это труднее всего понять. Похоже, что он имеет много перекрытий, но все, кажется, растягивается в водопаде, а не параллельно.

Method D appstats

Рекомендации

Основываясь на этом, чего мне не хватает? Я просто нажимаю лимит на App Engine или есть лучший способ вывести большое количество объектов параллельно?

Я не понимаю, что делать дальше. Я подумал о том, чтобы переписывать клиента, чтобы несколько запросов к движку приложения параллельно, но это выглядит довольно грубой силой. Я бы действительно ожидал, что движок приложения сможет обрабатывать этот прецедент, поэтому я предполагаю, что что-то мне не хватает.

Update

В конце концов я обнаружил, что вариант C был лучшим для моего случая. Я смог оптимизировать его за 6,1 секунды. Все еще не идеально, но гораздо лучше.

После получения консультаций от нескольких людей я обнаружил, что следующие пункты были ключевыми для понимания и учета:

  • Несколько запросов могут выполняться параллельно
  • Только 10 RPC могут быть в полете сразу
  • Попробуйте денормализовать до такой степени, что вторичных запросов нет.
  • Этот тип задачи лучше оставить карте сокращения и очереди задач, а не в режиме реального времени

Итак, что я сделал, чтобы сделать это быстрее:

  • Я разбил пространство запросов с начала по времени. (обратите внимание: чем больше равных разделов в терминах возвращаемых объектов, тем лучше)
  • Я денормировал данные дальше, чтобы удалить необходимость в запросе вторичного сеанса
  • Я использовал операции nynb async и wait_any() для перекрытия запросов с обработкой

Я все еще не получаю производительность, которую я ожидал бы или хотел бы, но теперь она работоспособна. Я просто хочу, чтобы их было лучшим способом быстро вывести большое количество последовательных объектов в память в обработчиках.

4b9b3361

Ответ 1

Большая обработка, подобная этой, не должна выполняться в запросе пользователя, который имеет ограничение по 60 с. Вместо этого это должно быть сделано в контексте, который поддерживает длительные запросы. очередь задач поддерживает запросы до 10 минут и (я считаю) нормальные ограничения памяти (экземпляры F1 по умолчанию имеют 128 МБ памяти). Для еще более высоких пределов (без тайм-аута запроса, 1 ГБ + памяти) используйте backends.

Здесь что-то попробовать: настройте URL-адрес, который при доступе запускает задачу очереди задач. Он возвращает веб-страницу, которая опросает каждые ~ 5 с другим URL-адресом, который отвечает true/false, если задача очереди задач еще не завершена. Очередь задач обрабатывает данные, которые могут занимать около 10 секунд, и сохраняет результат в хранилище данных в виде вычисленных данных или отображаемой веб-страницы. Как только начальная страница обнаруживает, что она была завершена, пользователь перенаправляется на страницу, которая извлекает теперь вычисленные результаты из хранилища данных.

Ответ 2

Новая экспериментальная функция Обработка данных (API AppEngine для MapReduce) выглядит очень подходящей для решения этой проблемы. Он выполняет автоматическую настройку для выполнения нескольких параллельных рабочих процессов.

Ответ 3

Большие операции с данными в App Engine наилучшим образом реализованы с использованием какой-либо операции mapreduce.

Здесь видео, описывающее процесс, но включающее BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

Не похоже, что вам нужен BigQuery, но вы, вероятно, захотите использовать как Map, так и уменьшить часть конвейера.

Основное различие между тем, что вы делаете и ситуацией с mapreduce, заключается в том, что вы запускаете один экземпляр и выполняете итерацию по запросам, где на mapreduce у вас будет отдельный экземпляр, работающий параллельно для каждого запроса. Вам понадобится операция уменьшения, чтобы "суммировать" все данные и записать результат где-нибудь.

Другая проблема заключается в том, что вы должны использовать курсоры для итерации. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Если итератор использует смещение запроса, он будет неэффективным, поскольку смещение выдает один и тот же запрос, пропускает несколько результатов и дает следующий набор, в то время как курсор переходит прямо к следующему набору.

Ответ 4

У меня есть аналогичная проблема, и после работы с поддержкой Google в течение нескольких недель я могу подтвердить, что нет волшебного решения, по крайней мере, с декабря 2017 года.

tl; dr: Можно ожидать пропускную способность от 220 сущ./сек. для стандартного SDK, работающего на экземпляре B1, до 900 сущностей/секунду для исправленного SDK, работающего на экземпляре B8.

Ограничение связано с процессором, а изменение типа instanced напрямую влияет на производительность. Это подтверждают аналогичные результаты, полученные в экземплярах B4 и B4_1G

Наилучшая пропускная способность, которую я получил для объекта Expando с примерно 30 полями:

Стандартный SDE SDE

  • Пример B1: ~ 220 сущ./сек.
  • B2 экземпляр: ~ 250 объектов в секунду
  • Пример B4: ~ 560 сущ./сек.
  • экземпляр B4_1G: ~ 560 сущ./сек.
  • Пример B8: ~ 650 сущ./сек.

Патч GAE SDK

  • Пример B1: ~ 420 сущ./сек.
  • Пример B8: ~ 900 сущ./сек.

Для стандартного GAE SDK я пробовал различные подходы, включая многопоточность, но лучше всего было fetch_async с wait_any. Текущая библиотека NDB уже отлично справляется с использованием async и фьючерсов под капотом, поэтому любая попытка надавить на использование потоков только ухудшит работу.

Я нашел два интересных подхода для оптимизации этого:

Мэтт Фаус очень хорошо объясняет проблему:

GAE SDK предоставляет API для чтения и записи объектов, полученных из ваши классы в хранилище данных. Это избавит вас от скучной работы проверка исходных данных, возвращаемых из хранилища данных и их переупаковка в простой в использовании объект. В частности, GAE использует протокольные буферы передавать необработанные данные из хранилища на внешнюю машину, которая нуждается в Это. Затем SDK отвечает за декодирование этого формата и возвращение чистый объект для вашего кода. Эта утилита замечательная, но иногда она делает немного больше работы, чем хотелось бы. [...] Использование нашего профилирования инструмент, я обнаружил, что полностью 50% времени, потраченного на получение этих сущности находились во время фазы декодирования протобуфа-питона-объекта. Эта означает, что ЦП на внешнем сервере был узким местом в этих datastore читает!

GAE-data-access-web-request

Оба подхода пытаются сократить время, затрачиваемое на протобуф на декодирование Python, уменьшив количество декодированных полей.

Я пробовал оба подхода, но мне удалось добиться успеха только с Мэттом. Внутренние SDK изменились с тех пор, как Эван опубликовал свое решение. Мне пришлось немного изменить код, опубликованный Matt здесь, но это было довольно легко - если есть интерес, я могу опубликовать окончательный код.

Для регулярного объекта Expando с примерно 30 полями я использовал решение Matt для декодирования только парных полей и получил значительное улучшение.

В заключение нужно планировать соответственно и не ожидайте, что сможете обрабатывать гораздо больше нескольких сотен объектов в запросе GAE "в реальном времени".