Я достигаю узкого места в моем приложении и испытываю трудное время, находя решение вокруг него. Немного фона :
- Мое приложение подключает API для сбора информации о сотнях тысяч предметов и их хранения в хранилище данных
- Нам нужно выполнить простые агрегации по смешению размеров этих элементов, которые мы пытаемся вычислить за время хранения элементов
Текущая реализация:
- Мы запускаем загрузку этих элементов вручную по мере необходимости, что создает задачи на сервере, предназначенном для загрузки этих элементов. Каждая задача запускает больше задач в зависимости от # вызовов API, необходимых для разбивки на страницы и получения каждого элемента.
- Каждая задача будет загружать, анализировать и массово хранить элементы, сохраняя при этом скопления, которые мы хотим в памяти, используя словарь.
- В конце выполнения каждой задачи мы записываем словарь агрегатов в очередь на выгрузку.
- Как только мы обнаруживаем, что мы приближаемся к концу вызовов API, мы запускаем задачу агрегации во вторую конфигурацию бэкэнд
- Эта "задача агрегации" вытягивается из очереди вытягивания (20 за раз) и объединяет словари, найденные в каждой задаче (далее выполняются в агрегации памяти), прежде чем пытаться сохранить каждый агрегат. Эта задача также запускает другие задачи для выполнения агрегаций для оставшихся задач в очереди на выгрузку (сотни).
- Мы используем подход sharded counter, чтобы помочь разрешить любые конфликты при хранении в хранилище данных
- Каждая задача агрегации может попытаться сохранить 500-1500 агрегатов, которые должны быть независимо друг от друга
Там есть дополнительные проверки и таковые, чтобы обеспечить правильную обработку всех задач очереди очередей и все элементы загружены.
Проблема:
Мы хотим как можно быстрее загрузить и сохранить все элементы и агрегаты. У меня есть 20 экземпляров для каждой описанной конфигурации брандмауэра (я буду называть их "бэкэнд" агрегатора и "загрузчик" ). Бэкэнд загрузчика, похоже, довольно быстро проходит через API-запросы. Я использую библиотеку NDB и асинхронные вызовы Fetches/Datastore URL для получения этого. Я также включил threadafe: true, чтобы ни один экземпляр не ожидал завершения RPC-вызовов до начала следующей задачи (все задачи могут работать независимо друг от друга и являются идемпотентными).
Бэкэнд агрегатора - это то место, где играет большое время. Хранение 500-1500 этих агрегатов асинхронно через транзакции занимает 40 секунд или более (и я даже не думаю, что все транзакции должным образом совершены). Я сохраняю этот сервер с потокобезопасным: false, так как я использую конечный срок истечения очереди по истечении 300 секунд, но если я разрешаю выполнить несколько задач в одном экземпляре, они могут каскадироваться и нажимать на завершение некоторых задач за 300 секунд mark, тем самым позволяя другой задаче выполнить одну и ту же задачу во второй раз и, возможно, выполнить двойной учет.
В журналах отображается BadRequestError: Nested transactions are not supported.
с предыдущей ошибкой (в трассировке стека) TransactionFailedError: too much contention on these datastore entities. please try again.
. Другая ошибка, которую я обычно вижу, - BadRequestError(The referenced transaction has expired or is no longer valid.)
По моему мнению, иногда эти ошибки означают, что транзакция может быть совершена без дальнейшего взаимодействия. Как узнать, правильно ли это было сделано? Я делаю это логично/эффективно или есть больше места для concurrency без риска повредить все?
Соответствующий код:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
Учитывая реализацию, единственная комната для "соперничества", которую я вижу, заключается в том, что если две или более совокупные задачи должны обновлять одно и то же имя агрегата, которое не должно происходить слишком часто, и с закрытыми счетчиками я бы ожидал, что это перекрытие будет редко, если вообще произойдет. Я предполагаю, что
BadRequestError(The referenced transaction has expired or is no longer valid.)
появляется ошибка, когда цикл события проверяет состояние всех талисманов и показывает ссылку на завершенную транзакцию. Проблема здесь в том, что это ошибки, значит ли это, что все транзакции преждевременно отрезаны или я могу предположить, что все транзакции прошли? Я также предполагаю, что эта строка res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
должна быть разбита на try/except, за исключением каждой маркеры, чтобы обнаружить эти ошибки.
До того, как я сокрушусь над этим, я был бы признателен за любое руководство/помощь в том, как оптимизировать этот процесс и сделать это надежным способом.
РЕДАКТИРОВАТЬ 1: Я изменил поведение задачи агрегатора следующим образом:
- Если из очереди было арендовано более 1 задачи, агрегируйте задачи в памяти, затем сохраните результат в другой задаче в очереди на загрузку и сразу запустите еще одну "задачу агрегатора"
- Иначе, если 1 задача была арендована, попробуйте сохранить результаты
Это помогло уменьшить конфликтующие ошибки, которые я видел, но он все еще не очень надежный. Совсем недавно я нажал BadRequestError: Nested transactions are not supported.
на трассировку стека, указав RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
Я полагаю, что эта модификация должна оптимизировать процесс, позволяя объединять все возможные совпадения в процессе агрегации и одновременно проверять их в одном экземпляре, а также несколько экземпляров, которые совершают транзакции, которые могут столкнуться. У меня все еще есть проблемы с сохранением результатов в надежной манере.