Подтвердить что ты не робот

Надежный способ выполнения тысяч независимых транзакций?

Я достигаю узкого места в моем приложении и испытываю трудное время, находя решение вокруг него. Немного фона :

  • Мое приложение подключает API для сбора информации о сотнях тысяч предметов и их хранения в хранилище данных
  • Нам нужно выполнить простые агрегации по смешению размеров этих элементов, которые мы пытаемся вычислить за время хранения элементов

Текущая реализация:

  • Мы запускаем загрузку этих элементов вручную по мере необходимости, что создает задачи на сервере, предназначенном для загрузки этих элементов. Каждая задача запускает больше задач в зависимости от # вызовов API, необходимых для разбивки на страницы и получения каждого элемента.
  • Каждая задача будет загружать, анализировать и массово хранить элементы, сохраняя при этом скопления, которые мы хотим в памяти, используя словарь.
  • В конце выполнения каждой задачи мы записываем словарь агрегатов в очередь на выгрузку.
  • Как только мы обнаруживаем, что мы приближаемся к концу вызовов API, мы запускаем задачу агрегации во вторую конфигурацию бэкэнд
  • Эта "задача агрегации" вытягивается из очереди вытягивания (20 за раз) и объединяет словари, найденные в каждой задаче (далее выполняются в агрегации памяти), прежде чем пытаться сохранить каждый агрегат. Эта задача также запускает другие задачи для выполнения агрегаций для оставшихся задач в очереди на выгрузку (сотни).
  • Мы используем подход sharded counter, чтобы помочь разрешить любые конфликты при хранении в хранилище данных
  • Каждая задача агрегации может попытаться сохранить 500-1500 агрегатов, которые должны быть независимо друг от друга

Там есть дополнительные проверки и таковые, чтобы обеспечить правильную обработку всех задач очереди очередей и все элементы загружены.

Проблема:

Мы хотим как можно быстрее загрузить и сохранить все элементы и агрегаты. У меня есть 20 экземпляров для каждой описанной конфигурации брандмауэра (я буду называть их "бэкэнд" агрегатора и "загрузчик" ). Бэкэнд загрузчика, похоже, довольно быстро проходит через API-запросы. Я использую библиотеку NDB и асинхронные вызовы Fetches/Datastore URL для получения этого. Я также включил threadafe: true, чтобы ни один экземпляр не ожидал завершения RPC-вызовов до начала следующей задачи (все задачи могут работать независимо друг от друга и являются идемпотентными).

Бэкэнд агрегатора - это то место, где играет большое время. Хранение 500-1500 этих агрегатов асинхронно через транзакции занимает 40 секунд или более (и я даже не думаю, что все транзакции должным образом совершены). Я сохраняю этот сервер с потокобезопасным: false, так как я использую конечный срок истечения очереди по истечении 300 секунд, но если я разрешаю выполнить несколько задач в одном экземпляре, они могут каскадироваться и нажимать на завершение некоторых задач за 300 секунд mark, тем самым позволяя другой задаче выполнить одну и ту же задачу во второй раз и, возможно, выполнить двойной учет.

В журналах отображается BadRequestError: Nested transactions are not supported. с предыдущей ошибкой (в трассировке стека) TransactionFailedError: too much contention on these datastore entities. please try again.. Другая ошибка, которую я обычно вижу, - BadRequestError(The referenced transaction has expired or is no longer valid.)

По моему мнению, иногда эти ошибки означают, что транзакция может быть совершена без дальнейшего взаимодействия. Как узнать, правильно ли это было сделано? Я делаю это логично/эффективно или есть больше места для concurrency без риска повредить все?

Соответствующий код:

class GeneralShardConfig(ndb.Model):
    """Tracks the number of shards for each named counter."""
    name = ndb.StringProperty(required=True)
    num_shards = ndb.IntegerProperty(default=4)

class GeneralAggregateShard(ndb.Model):
    """Shards for each named counter"""
    name = ndb.StringProperty(name='n', required=True)
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now

@ndb.tasklet
def increment_batch(data_set):
    def run_txn(name, value):
        @ndb.tasklet
        def txn():
            to_put = []
            dbkey = ndb.Key(GeneralShardConfig, name)
            config = yield dbkey.get_async(use_memcache=False)
            if not config:
                config = GeneralShardConfig(key=dbkey,name=name)
                to_put.append(config)
            index = random.randint(0, config.num_shards-1)
            shard_name =  name + str(index)
            dbkey = ndb.Key(GeneralAggregateShard, shard_name)
            counter = yield dbkey.get_async()
            if not counter:
                counter = GeneralAggregateShard(key=dbkey, name=name)
            counter.count += value
            to_put.append(counter)
            yield ndb.put_multi_async(to_put)
        return ndb.transaction_async(txn, use_memcache=False, xg=True)
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
    raise ndb.Return(res)

Учитывая реализацию, единственная комната для "соперничества", которую я вижу, заключается в том, что если две или более совокупные задачи должны обновлять одно и то же имя агрегата, которое не должно происходить слишком часто, и с закрытыми счетчиками я бы ожидал, что это перекрытие будет редко, если вообще произойдет. Я предполагаю, что BadRequestError(The referenced transaction has expired or is no longer valid.) появляется ошибка, когда цикл события проверяет состояние всех талисманов и показывает ссылку на завершенную транзакцию. Проблема здесь в том, что это ошибки, значит ли это, что все транзакции преждевременно отрезаны или я могу предположить, что все транзакции прошли? Я также предполагаю, что эта строка res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00] должна быть разбита на try/except, за исключением каждой маркеры, чтобы обнаружить эти ошибки.

До того, как я сокрушусь над этим, я был бы признателен за любое руководство/помощь в том, как оптимизировать этот процесс и сделать это надежным способом.

РЕДАКТИРОВАТЬ 1: Я изменил поведение задачи агрегатора следующим образом:

  • Если из очереди было арендовано более 1 задачи, агрегируйте задачи в памяти, затем сохраните результат в другой задаче в очереди на загрузку и сразу запустите еще одну "задачу агрегатора"
  • Иначе, если 1 задача была арендована, попробуйте сохранить результаты

Это помогло уменьшить конфликтующие ошибки, которые я видел, но он все еще не очень надежный. Совсем недавно я нажал BadRequestError: Nested transactions are not supported. на трассировку стека, указав RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

Я полагаю, что эта модификация должна оптимизировать процесс, позволяя объединять все возможные совпадения в процессе агрегации и одновременно проверять их в одном экземпляре, а также несколько экземпляров, которые совершают транзакции, которые могут столкнуться. У меня все еще есть проблемы с сохранением результатов в надежной манере.

4b9b3361

Ответ 1

Уменьшая ввод/вывод хранилища данных (оставляя работу автобаттерам и отключая индексирование), вы можете быть более уверены в том, что хранилище данных завершено (меньше конфликтов), и оно должно быть быстрее.

Консоль конфигурации (переименованный счетчик) получает вне транзакции (транзакций) и может выполняться одновременно во время цикла транзакций.

Методы и общее свойство были добавлены в Counter to (надеюсь), чтобы облегчить его изменение в будущем.

Создано новое свойство ndb для поддержки десятичной запятой (при условии, что вы указываете 0.00 вместо 0.0).

EDIT:

Устранили необходимость в транзакциях и поменяли систему осколков на надежность.

import webapp2

import copy
import decimal
import logging
import random
import string

from google.appengine.api import datastore_errors
from google.appengine.datastore import entity_pb
from google.appengine.ext import deferred
from google.appengine.ext import ndb


TEST_BATCH_SIZE = 250
TEST_NAME_LEN = 12


class DecimalProperty(ndb.Property):
    """A Property whose value is a decimal.Decimal object."""

    def _datastore_type(self, value):
      return str(value)

    def _validate(self, value):
      if not isinstance(value, decimal.Decimal):
        raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
                                             % (value,))
      return value

    def _db_set_value(self, v, p, value):
        value = str(value)
        v.set_stringvalue(value)
        if not self._indexed:
            p.set_meaning(entity_pb.Property.TEXT)

    def _db_get_value(self, v, _):
        if not v.has_stringvalue():
            return None
        value = v.stringvalue()
        return decimal.Decimal(value)

class BatchInProgress(ndb.Model):
    """Use a scheduler to delete batches in progress after a certain time"""

    started = ndb.DateTimeProperty(auto_now=True)

    def clean_up(self):
        qry = Shard.query().filter(Shard.batch_key == self.key)
        keys = qry.fetch(keys_only=True)
        while keys:
            ndb.delete_multi(keys)
            keys = qry.fetch(keys_only=True)

def cleanup_failed_batch(batch_key):
    batch = batch_key.get()

    if batch:
        batch.clean_up()
        batch.delete()

class Shard(ndb.Model):
    """Shards for each named counter"""

    counter_key = ndb.KeyProperty(name='c')
    batch_key = ndb.KeyProperty(name='b')
    count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
                            indexed=False)

class Counter(ndb.Model):
    """Tracks the number of shards for each named counter"""

    @property
    def shards(self):
        qry = Shard.query().filter(Shard.counter_key == self.key)
        results = qry.fetch(use_cache=False, use_memcache=False)
        return filter(None, results)

    @property
    def total(self):
        count = decimal.Decimal('0.00') # Use initial value if no shards

        for shard in self.shards:
            count += shard.count

        return count

    @ndb.tasklet
    def incr_async(self, value, batch_key):
        index = batch_key.id()
        name = self.key.id() + str(index)

        shard = Shard(id=name, count=value,
                      counter_key=self.key, batch_key=batch_key)

        yield shard.put_async(use_cache=False, use_memcache=False)

    def incr(self, *args, **kwargs):
        return self.incr_async(*args, **kwargs).get_result()

@ndb.tasklet
def increment_batch(data_set):
    batch_key = yield BatchInProgress().put_async()
    deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)

    # NOTE: mapping is modified in place, hence copying
    mapping = copy.copy(data_set)

    # (1/3) filter and fire off counter gets
    #       so the futures can autobatch
    counters = {}
    ctr_futs = {}
    ctr_put_futs = []
    zero_values = set()
    for name, value in mapping.iteritems():
        if value != decimal.Decimal('0.00'):
            ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
            ctr_futs[name] = ctr_fut
        else:
            # Skip zero values because...
            zero_values.add(name)
            continue

    for name in zero_values:
        del mapping[name] # Remove all zero values from the mapping
    del zero_values

    while mapping: # Repeat until all transactions succeed

        # (2/3) wait on counter gets and fire off increment transactions
        #       this way autobatchers should fill time
        incr_futs = {}
        for name, value in mapping.iteritems():
            counter = counters.get(name)
            if not counter:
                counter = counters[name] = yield ctr_futs.pop(name)
            if not counter:
                logging.info('Creating new counter %s', name)
                counter = counters[name] = Counter(id=name)
                ctr_put_futs.append(counter.put_async())
            else:
                logging.debug('Reusing counter %s', name)
            incr_fut = counter.incr_async(value, batch_key)
            incr_futs[(name, value)] = incr_fut

        # (3/3) wait on increments and handle errors
        #       by using a tuple key for variable access
        for (name, value), incr_fut in incr_futs.iteritems():
            counter = counters[name]
            try:
                yield incr_fut
            except:
                pass
            else:
                del mapping[name]

        if mapping:
            logging.warning('%i increments failed this batch.' % len(mapping))

    yield batch_key.delete_async(), ctr_put_futs

    raise ndb.Return(counters.values())

class ShardTestHandler(webapp2.RequestHandler):

    @ndb.synctasklet
    def get(self):
        if self.request.GET.get('delete'):
            ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
            ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
            ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
        else:
            data_set_test = {}
            for _ in xrange(TEST_BATCH_SIZE):
                name = ''
                for _ in xrange(TEST_NAME_LEN):
                    name += random.choice(string.letters)
                value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
                data_set_test[name] = value
            yield increment_batch(data_set_test)
        self.response.out.write("Done!")

app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
app = ndb.toplevel(app.__call__)

Ответ 2

В частности, по теме "Связанная транзакция истекла или уже недействительна" BadRequestError, это мало рекламируемый факт, что транзакции будут время намного быстрее, чем запрос. С момента создания вы получаете бесплатно 15 секунд жизни, после чего транзакция убивается, если она проводит 15 секунд подряд бездействия (так эффективный минимальный срок службы составляет 30 секунд), и он тяжело убит независимо от того, что после 60 секунд. Это затрудняет параллельное параллельное выполнение большого количества транзакций, поскольку конкуренция процессора и несправедливый алгоритм планирования таргетинга могут сговориться, чтобы некоторые транзакции простаивали слишком долго.

Следующий метод транзакции monkeypatch to ndb помогает бит, повторив истекшие транзакции, но в конечном итоге вам нужно настроить свою доработку, чтобы уменьшить количество конфликтов до уровней управления.

_ndb_context_transaction = ndb.Context.transaction

@ndb.tasklet
def _patched_transaction(self, callback, **ctx_options):
  if (self.in_transaction() and
      ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT):
    raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options)))

  attempts = 1
  start_time = time.time()
  me = random.getrandbits(16)
  logging.debug('Transaction started <%04x>', me)
  while True:
    try:
      result = yield _ndb_context_transaction(self, callback, **ctx_options)
    except datastore_errors.BadRequestError as e:
      if not ('expired' in str(e) and
              attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS):
        raise
      logging.warning(
          'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s',
          me, attempts, time.time() - start_time, e)
      attempts += 1
    else:
      logging.debug(
          'Transaction finished <%04x> (attempt #%d, %.1f seconds)',
           me, attempts, time.time() - start_time)
      raise ndb.Return(result)

ndb.Context.transaction = _patched_transaction