Я столкнулся с интересным лимитом с хранилищем данных App Engine. Я создаю обработчик, чтобы помочь нам проанализировать некоторые данные об использовании на одном из наших производственных серверов. Чтобы выполнить анализ, мне нужно запросить и суммировать 10 000+ объектов, извлеченных из хранилища данных. Расчет не является трудным, это всего лишь гистограмма элементов, которые передают конкретный фильтр образцов использования. Проблема, с которой я сталкиваюсь, заключается в том, что я не могу получить данные обратно из хранилища достаточно быстро, чтобы выполнить какую-либо обработку, прежде чем нажать на крайний срок запроса.
Я пробовал все, что я могу придумать, чтобы разделить запрос на параллельные вызовы RPC для повышения производительности, но, согласно appstats, я не могу заставить запросы фактически выполнять параллельно. Независимо от того, какой метод я пытаюсь (см. Ниже), всегда кажется, что RPC возвращается к водопаду последовательных последующих запросов.
Примечание: код запроса и анализа работает, он работает медленно, потому что я не могу получить данные достаточно быстро из хранилища данных.
Фон
У меня нет живой версии, которую я могу поделиться, но вот базовая модель для той части системы, о которой я говорю:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
Вы можете рассматривать образцы как раз, когда пользователь использует возможность имени. (например: 'systemA.feature_x'). Теги основаны на деталях клиента, системной информации и функции. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). Таким образом, теги образуют денормализованный набор токенов, которые можно использовать для поиска интересующих образцов.
Анализ, который я пытаюсь сделать, состоит в том, чтобы использовать диапазон дат и задавать, сколько раз было функцией набора функций (возможно, всех функций), используемых в день (или в час) на учетную запись клиента (компания, а не на пользователя).
Таким образом, вход в обработчик будет выглядеть следующим образом:
- Дата начала
- Конечная дата
- Tag (s)
Вывод:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
Общий код для запросов
Вот несколько общих кодов для всех запросов. Общая структура обработчика - простой обработчик get using webapp2, который настраивает параметры запроса, запускает запрос, обрабатывает результаты, создает данные для возврата.
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
Пробы методов
Я попробовал различные методы, чтобы попытаться извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:
A. Одиночная итерация
Это более простой базовый случай для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb делать то, что он делает, чтобы вытаскивать их один за другим.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
В. Большая выборка
Идея здесь состояла в том, чтобы увидеть, могу ли я сделать одну очень большую выборку.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
С. Async выбирает диапазон времени
Идея здесь состоит в том, чтобы распознать, что образцы достаточно хорошо распределены во времени, поэтому я могу создать набор независимых запросов, которые разбивают общую область времени на куски и пытаются запустить каждую из них параллельно с помощью async:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. Асинхронное отображение
Я попробовал этот метод, потому что в документации это звучало как ndb, может использовать некоторое parallelism автоматически при использовании метода Query.map_async.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Результат
Я проверил один пример запроса, чтобы собрать общее время отклика и трассировки appstats. Результаты:
A. Одиночная итерация
real: 15.645s
Эта последовательность последовательно проходит через выборку пакетов один за другим, а затем извлекает каждый сеанс из memcache.
В. Большая выборка
real: 12.12s
Эффективно то же, что и опция A, но по какой-то причине немного быстрее.
С. Async выбирает диапазон времени
real: 15.251s
Появляется, чтобы предоставить больше parallelism в начале, но, похоже, замедлится последовательностью вызовов к следующей во время итерации результатов. Также не похоже, что они могут перекрывать запросы memcache сеанса с ожидающими запросами.
D. Асинхронное отображение
real: 13.752s
Это труднее всего понять. Похоже, что он имеет много перекрытий, но все, кажется, растягивается в водопаде, а не параллельно.
Рекомендации
Основываясь на этом, чего мне не хватает? Я просто нажимаю лимит на App Engine или есть лучший способ вывести большое количество объектов параллельно?
Я не понимаю, что делать дальше. Я подумал о том, чтобы переписывать клиента, чтобы несколько запросов к движку приложения параллельно, но это выглядит довольно грубой силой. Я бы действительно ожидал, что движок приложения сможет обрабатывать этот прецедент, поэтому я предполагаю, что что-то мне не хватает.
Update
В конце концов я обнаружил, что вариант C был лучшим для моего случая. Я смог оптимизировать его за 6,1 секунды. Все еще не идеально, но гораздо лучше.
После получения консультаций от нескольких людей я обнаружил, что следующие пункты были ключевыми для понимания и учета:
- Несколько запросов могут выполняться параллельно
- Только 10 RPC могут быть в полете сразу
- Попробуйте денормализовать до такой степени, что вторичных запросов нет.
- Этот тип задачи лучше оставить карте сокращения и очереди задач, а не в режиме реального времени
Итак, что я сделал, чтобы сделать это быстрее:
- Я разбил пространство запросов с начала по времени. (обратите внимание: чем больше равных разделов в терминах возвращаемых объектов, тем лучше)
- Я денормировал данные дальше, чтобы удалить необходимость в запросе вторичного сеанса
- Я использовал операции nynb async и wait_any() для перекрытия запросов с обработкой
Я все еще не получаю производительность, которую я ожидал бы или хотел бы, но теперь она работоспособна. Я просто хочу, чтобы их было лучшим способом быстро вывести большое количество последовательных объектов в память в обработчиках.