Подтвердить что ты не робот

Как предотвратить дублирование сообщений SQS?

Каков наилучший способ предотвратить дублирование сообщений в Amazon SQS? У меня есть SQS доменов, ожидающих обхода. прежде чем я добавлю новый домен в SQS, я могу проверить с сохраненными данными, чтобы увидеть, был ли он недавно просканирован, чтобы предотвратить дубликаты.

Проблема с доменами, которые еще не были сканированы. Например, если в очереди, которая не была сканирована, 1000 доменов. Любая из этих ссылок может быть добавлена ​​снова и снова и снова. Который раздувает мой SQS сотням тысяч сообщений, которые в основном дублируются.

Как это предотвратить? Есть ли способ удалить все дубликаты из очереди? Или есть способ поиска очереди для сообщения, прежде чем я его добавлю? Я чувствую, что это проблема, которую должен испытать любой человек с SQS.

Один из вариантов, который я могу увидеть, - это хранить некоторые данные до того, как домен будет добавлен в SQS. Но если мне нужно хранить данные дважды, этот вид разрушает точку использования SQS в первую очередь.

4b9b3361

Ответ 1

Как упоминалось в других ответах, вы не можете предотвратить дублирование сообщений, поступающих из SQS.

В большинстве случаев ваши сообщения будут переданы одному из ваших потребителей один раз, но вы будете запускать дубликаты на определенном этапе.

Я не думаю, что есть простой ответ на этот вопрос, потому что он влечет за собой создание правильной архитектуры, которая может справиться с дубликатами, что означает ее идемпотентность в природе.

Если все рабочие в вашей распределенной архитектуре были идемпотентны, это было бы легко, потому что вам не нужно было бы беспокоиться о дубликатах. Но на самом деле такого рода среда не существует, где-то по пути что-то не сможет справиться с ней.

В настоящее время я работаю над проектом, где мне требуется решить эту проблему, и придумать подход к ее решению. Я думал, что это может принести пользу другим, чтобы поделиться своим мнением здесь. И это может быть хорошим местом, чтобы получить некоторые отзывы о моем мышлении.

Хранилище факсов

Это очень хорошая идея для развития сервисов, чтобы они собирали факты, которые теоретически могут воспроизводиться, чтобы воспроизвести одно и то же состояние во всех затронутых нижележащих системах.

Например, скажем, вы строите брокер сообщений для платформы торговли акциями. (Я на самом деле работал над таким проектом раньше, это было ужасно, но и хороший опыт обучения.)

Теперь скажем, что эта сделка приходит, и в ней заинтересованы 3 системы:

  • Старый мэйнфрейм школы, который должен оставаться обновленным.
  • Система, которая объединяет все сделки и делится ею с партнерами на FTP-сервере.
  • Служба, которая регистрирует сделку и перераспределяет акции новому владельцу

Это немного запутанно, я знаю, но идея состоит в том, что одно сообщение (факт), входящее в него, имеет различные распределенные последующие эффекты.

Теперь представьте, что мы поддерживаем магазин фактов, запись всех сделок, поступающих в наш брокер. И что все 3 провайдера обслуживания вниз по течению звонят нам, чтобы сообщить нам, что они потеряли все свои данные за последние 3 дня. Загрузка FTP на 3 дня позади, мейнфрейм на 3 дня позади, и все сделки отстают на 3 дня.

Поскольку у нас есть магазин фактов, мы могли бы теоретически воспроизвести все эти сообщения с определенного времени до определенного времени. В нашем примере это будет с 3 дней назад до сих пор. И нисходящие сервисы могут быть догнаны.

Этот пример может показаться немного верхним, но я пытаюсь передать что-то очень конкретное: факты - важные вещи, которые нужно отслеживать, потому что это то, что мы будем использовать в нашей архитектуре для сражения с дубликатами.

Как хранилище фактов помогает нам с повторяющимися сообщениями

При условии, что вы реализуете свой магазин фактов на уровне сохранения, который дает вам части CA теорему CAP, согласованность и доступность, вы можете сделать следующее:

Как только сообщение будет получено из очереди, вы проверите свой магазин фактов, видели ли вы это сообщение раньше, и если у вас есть, заблокирован ли он в данный момент и находится в состоянии ожидания. В моем случае я буду использовать MongoDB для реализации своего магазина фактов, так как мне это очень нравится, но различные другие технологии БД должны иметь возможность справиться с этим.

Если этот факт еще не существует, он будет вставлен в хранилище фактов с ожидающим состоянием и временем истечения срока блокировки. Это должно быть сделано с использованием атомных операций, потому что вы не хотите, чтобы это произошло дважды! Здесь вы гарантируете свою услугу idempotence.

Счастливый случай - чаще всего происходит

Когда Fact Store вернется к вашей службе, сообщив, что этого факта не существует, и что была создана блокировка, служба пытается это сделать. После этого он удалит сообщение SQS и отметит завершенный факт.

Дублирующее сообщение

Итак, что происходит, когда приходит сообщение, а это не дубликат. Но давайте посмотрим, когда приходит дублирующее сообщение. Служба подбирает его и просит магазин фактов записать его с помощью блокировки. Хранилище фактов сообщает, что оно уже существует и что оно заблокировано. Служба игнорирует сообщение и пропускает его! Как только обработка сообщений будет выполнена другим рабочим, он удалит это сообщение из очереди, и мы больше не увидим его.

Случай с бедствиями - редко бывает

Итак, что происходит, когда служба впервые регистрирует этот факт в магазине, затем получает блокировку на определенный период, но падает? Хорошо SQS снова представит вам сообщение, если оно было подобрано, но не удалено в течение определенного периода после его подачи из очереди. Вот почему мы кодируем наш магазин фактов таким образом, чтобы служба поддерживала блокировку в течение ограниченного времени. Поскольку, если он падает, мы хотим, чтобы SQS представила сообщение службе или другой экземпляр этого позднее, позволяя этой службе предположить, что этот факт должен быть включен в состояние (выполнено) еще раз.

Ответ 2

Нет способа уровня API, препятствующего отправке повторяющихся сообщений в очередь SQS. Вам нужно будет справиться с этим на уровне приложения, которого я боюсь.

Вы можете использовать таблицу DynamoDB для хранения ваших доменных имен, ожидающих обхода, и добавлять их только в очередь, если они не находятся в DynamoDB, например.

Ответ 3

Согласно Документам AWS, Exactly-Once Processing предоставляет способ избежать дублирования сообщений.

В отличие от стандартных очередей, очереди FIFO не содержат дубликатов Сообщения. Очереди FIFO помогают избежать отправки дубликатов в очередь. Если вы повторяете действие SendMessage в течение 5-минутной дедупликации В интервале Amazon SQS не вносит дубликатов в очередь.

Если ваша очередь является очередью FIFO и включает дублирование на основе содержимого, эту функцию можно использовать, чтобы избежать дублирования сообщений в течение интервала дедупликации. Для получения дополнительной информации прочитайте этот раздел и ссылку ниже.

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html#cfn-sqs-queue-contentbaseddeduplication

Ответ 4

Поскольку вы не можете помешать SQS отправлять дублированные сообщения, вы должны реализовать это на своей стороне. Один простой способ сделать это - использовать пользователей Apache Camel idempotent, см. http://camel.apache.org/idempotent-consumer.html

Ответ 5

Amazon SQS представляет очереди FIFO с ровной обработкой и более низкими ценами для стандартных очередей

Использование идентификатора дедупликации сообщения Amazon SQS Сообщение ID дедупликации - это токен, используемый для дедупликации отправленных сообщений. Если отправлено сообщение с определенным идентификатором дедупликации сообщения успешно все сообщения, отправленные с тем же идентификатором дедупликации сообщения принимаются успешно, но не доставляются в течение 5 минут интервал дедупликации.

Amazon SQS представляет очереди FIFO

Использование идентификатора дедупликации сообщений Amazon SQS