Подтвердить что ты не робот

Как обрабатывать очередь SQS с помощью лямбда-функции (а не через запланированные события)?

Вот упрощенная схема, которую я пытаюсь сделать:

http-запросы → (API шлюза + лямбда A) → SQS → (лямбда B ) → DynamoDB

Таким образом, он должен работать, как показано: данные, поступающие из многих HTTP-запросов (например, до 500 в секунду) помещается в очередь SQS по моей лямбда-функции A. Затем другая функция B обрабатывает очередь: считывает до 10 элементов (на некоторой периодической основе) и записывает их в DynamoDB с BatchWriteItem.

Проблема в том, что я не могу понять, как запустить вторую лямбда-функцию. Его нужно вызывать часто, несколько раз в секунду (или, по крайней мере, один раз в секунду), потому что мне нужны все данные из очереди, чтобы попасть в DynamoDB ASAP (что почему вызов лямбда-функции B через запланированные события, как описано здесь не является вариантом)


Почему я не хочу писать напрямую в DynamoDB без SQS?

Мне было бы здорово избежать использования SQS. Проблема, с которой я пытаюсь обратиться в SQS, - это демпфирование DynamoDB. Даже не дросселируйте себя, а обрабатывайте его, записывая данные в DynamoDB с помощью AWS SDK: при записи записей один за другим и их дросселировании AWS SDK молча повторяет запись, что приводит к увеличению времени обработки запроса с точки http-клиента вид.

Итак, я хотел бы временно сохранить данные в очереди, отправить ответ "200 OK" обратно клиенту, а затем получить очередь, обрабатываемую отдельной функцией, запись нескольких записей одним вызовом DynamoDB BatchWriteItem (который возвращает необработанные элементы вместо автоматического повтора в случае дросселирования). Я даже предпочел бы потерять несколько записей вместо увеличения задержки между принимаемой записью и сохранением в DynamoDB Забастовкa >

UPD: Если кому-то интересно, я нашел способ сделать aws-sdk пропустить автоматические повторы в случае дросселирования: есть специальный параметр maxRetries. Во всяком случае, собирается использовать Кинезис, как предложено ниже

4b9b3361

Ответ 1

[Это напрямую не отвечает на ваш явный вопрос, поэтому, по моему опыту, он будет занижен:) Однако я отвечу на фундаментальную проблему, которую вы пытаетесь решить.]

То, как мы принимаем поток входящих запросов и передаем их в функции AWS Lambda для записи в динамическом режиме DynamoDB, заключается в замене SQS в предлагаемой архитектуре потоками Amazon Kinesis.

Потоки Kinesis могут управлять функциями LMS в AWS.

Потоки Kinesis гарантируют упорядочивание доставленных сообщений для любого заданного ключа (хорошо для операций с упорядоченной базой данных).

Потоки Kinesis позволяют указать, сколько функций AWMS Lambda может выполняться параллельно (по одному на раздел), что может быть согласовано с вашей емкостью записи DynamoDB.

Потоки Kinesis могут передавать несколько доступных сообщений в одном вызове функции AWS Lambda, что позволяет продолжить оптимизацию.

Примечание. Это действительно сервис AWS Lambda, который читает из потоков Amazon Kinesis, затем вызывает функцию, а не потоки Kinesis, напрямую вызывающие AWS Lambda; но иногда легче визуализировать, как это делает Кинезис. Результат для пользователя почти тот же.

Ответ 2

Вы не можете сделать это напрямую, интегрируя SQS и Lambda, к сожалению. Но пока не волнуйся. Есть решение! Вам нужно добавить еще одну услугу amazon в микс, и все ваши проблемы будут решены.

http requests --> (Gateway API + lambda A) --> SQS + SNS --> lambda B --> DynamoDB

Вы можете инициировать уведомление SNS второй лямбда-службе, чтобы ее выключить. Как только он запущен, он может слить очередь и записать все результаты в DynamoDB. Чтобы лучше понять возможные источники событий для Lambda, проверьте эти документы.

Ответ 3

Другим решением было бы просто добавить элемент в SQS, вызвать целевую функцию Lambda с событием, чтобы она была асинхронной.

Асинхронная Lambda может затем получить от SQS столько элементов, сколько вы хотите, и обработать их.

Я бы также добавил запланированный вызов асинхронной Lambda для обработки любых элементов в очереди, которая была ошибкой.

Ответ 4

Возможно, более экономичным решением было бы сохранить все в SQS (как есть), а затем запустить запланированное событие, которое вызывает многопоточную функцию Lambda, которая обрабатывает элементы из очереди?

Таким образом, ваш рабочий стол может точно соответствовать вашим лимитам. Если очередь пуста, функция может закончиться преждевременно или начать опрос в одном потоке.

Kinesis звучит как over-kill для этого случая - например, вам не нужен оригинальный порядок. Плюс запуск нескольких Lambdas одновременно, безусловно, дороже, чем запуск только одной многопоточной Lambda.

Ваша Лямбда будет касаться ввода/вывода, делая внешние вызовы AWS-сервисам, поэтому одна функция может быть очень хорошей.