Подтвердить что ты не робот

Лучший способ переместить сообщения с DLQ в Amazon SQS?

Какова наилучшая практика перемещения сообщений из очереди с мертвой буквой обратно в исходную очередь в Amazon SQS?

Будет ли это

  • Получить сообщение из DLQ
  • Запись сообщения в очередь
  • Удалить сообщение из DLQ

Или есть более простой способ?

Кроме того, будет ли AWS в конечном итоге иметь инструмент в консоли для перемещения сообщений с DLQ?

4b9b3361

Ответ 1

Это похоже на ваш лучший вариант. Существует вероятность того, что ваш процесс завершится с ошибкой после шага 2. В этом случае вы в конечном итоге скопируете сообщение дважды, но вы все равно должны обрабатывать повторную доставку сообщений (или не заботьтесь).

Ответ 2

Вот быстрый хак. Это определенно не лучший или рекомендуемый вариант.

  • Задайте основную очередь SQS как DLQ для фактического DLQ с максимальным приемом как 1.
  • Просмотр содержимого в DLQ (это переместит сообщения в основную очередь, так как это DLQ для фактического DLQ)
  • Удалите настройку, чтобы главная очередь больше не была DLQ фактического DLQ

Ответ 3

Не нужно перемещать сообщение, потому что оно будет сопряжено со многими другими проблемами, такими как дублирование сообщений, сценарии восстановления, потерянное сообщение, проверка дедупликации и т.д.

Вот решение, которое мы реализовали -

Обычно мы используем DLQ для временных ошибок, а не для постоянных ошибок. Так взял ниже подход -

  1. Прочитайте сообщение из DLQ как обычную очередь

    Выгоды
    • Чтобы избежать дублирования обработки сообщений
    • Лучший контроль над DLQ- Как я поставил проверку, чтобы обрабатывать только тогда, когда обычная очередь полностью обработана.
    • Масштабировать процесс на основе сообщения на DLQ
  2. Затем следуйте тому же коду, который следует обычной очереди.

  3. Более надежный в случае прерывания работы или если процесс был прерван во время обработки (например, экземпляр остановлен или процесс остановлен)

    Выгоды
    • Повторное использование кода
    • Обработка ошибок
    • Восстановление и воспроизведение сообщений
  4. Расширьте видимость сообщения, чтобы никакой другой поток не обрабатывал их.

    Выгода
    • Избегайте обработки одной и той же записи несколькими потоками.
  5. Удалите сообщение только в том случае, если произошла постоянная ошибка или успешно.

    Выгода
    • Продолжайте обработку, пока мы не получим временную ошибку.

Ответ 4

здесь:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()

Ответ 5

Для этого я написал небольшой скрипт на python, используя boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

Вы можете получить этот скрипт по этой ссылке

этот скрипт в основном может перемещать сообщения между произвольными очередями. и он поддерживает очереди fifo, а также вы можете указать поле message_group_id.

Ответ 6

Есть еще один способ добиться этого без написания одной строки кода. Учтите, что ваше действительное имя очереди - SQS_Queue, а DLQ для него - SQS_DLQ. Теперь выполните следующие действия:

  1. Установите SQS_Queue как dlq для SQS_DLQ. Поскольку SQS_DLQ уже является dlq SQS_Queue. Теперь оба выступают в роли других.
  2. Установите максимальный счетчик приема вашего SQS_DLQ равным 1.
  3. Теперь читайте сообщения из консоли SQS_DLQ. Так как количество полученных сообщений равно 1, оно отправит все сообщения в свой собственный dlq, который является вашей действительной очередью SQS_Queue.

Ответ 7

Мы используем следующий скрипт для перенаправления сообщения из очереди src в очередь tgt:

имя файла: redrive.py

использование: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue Dead Letter Queue.
Also set Source Queue redrive policy, Maximum Receives to 1. 
Also set Source Queue VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()