Подтвердить что ты не робот

Поиск определенных сообщений в SQS

Я знаю, что SQS для этого не строится, но мне любопытно, можно ли найти сообщения в очереди, удовлетворяющей некоторым критериям?

Я могу вытащить сообщения в цикле, искать тела сообщений для некоторого шаблона (даже не десериализовать их) и фильтровать нужные мне сообщения. Но тогда можно кончить бесконечным циклом - первые прочитанные сообщения вернутся в очередь к тому времени, когда я дойду до конца очереди...

Расширение видимости сообщений, но как узнать, сколько времени потребуется для сканирования всей очереди, и как долго я должен расширить видимость? Что делать, если у меня есть буквально десять тысяч сообщений?

Есть ли какое-нибудь обходное решение? Мне нужно отсканировать очередь для некоторых сообщений и удалить их...

4b9b3361

Ответ 1

Короткий ответ: нет.

Очереди предназначены для таких задач, как задачи. Машина захватывает новую задачу (то есть сообщение) из очереди, выполняет задачу и затем удаляет задачу.

Если вы пытаетесь найти сообщения для их фильтрации, я не могу не задаться вопросом, используете ли вы неправильный инструмент для работы...

Ответ 2

Я не думаю, что короткие или длинные ответы - "Нет".

Вот два противоположных решения, которые "Да".

  1. Обход очереди, ведение списка посещенных
  2. Использование корпоративных интеграционных шаблонов (маршрутизация сообщений) для разделения ваших сообщений на нисходящие потоки на основе критериев

1. Обход очереди, ведение списка посещенных

Рассмотрим случай очереди с сообщениями N, когда сообщения не добавляются и не удаляются. Без дополнительной информации (например, если вы знали, сколько сообщений должно соответствовать вашим критериям), вам необходимо просмотреть все сообщения N.

Ключевым моментом здесь является знание того, когда вы прошли все сообщения N. Здесь есть некоторые проблемы.

  1. Чтобы точно знать, вам нужно отслеживать сообщения по мере их добавления в очередь
  2. Чтобы узнать приблизительно, вы можете получить атрибут ApproximateNumberOfMessages очереди
  3. Или вы можете получать сообщения в цикле, поддерживая посещенный список, и предполагать, что в конечном итоге вы будете отбирать и исчерпывать сообщения с каждого сервера, в котором ваша очередь отбрасывается

Для ведения списка посещенных сообщений по мере получения сообщений и оценки критериев соответствия вы можете сохранить message_id всех посещенных сообщений.

ID сообщения почти уникальны. Смотрите эту тему

https://forums.aws.amazon.com/message.jspa?messageID=76119

Если вы пошли с (3), вы не были бы уверены, сколько итераций потребуется для исчерпания очереди. Однако, если вы выполняете это на неопределенный срок, вы гарантированно исчерпаете очередь, если взвешенное случайное распределение по серверам сегментов SQS дает им ненулевую вероятность.

2. Использование шаблонов интеграции предприятия (маршрутизация сообщений) для разделения ваших сообщений на нисходящие потоки на основе критериев

Если у вас есть контроль над архитектурой обмена сообщениями, вы можете использовать маршрутизатор сообщений в качестве "внешнего" обработчика сообщений, который отправляет сообщения различным получателям на основе критериев.

И, в частности, вы бы использовали контентно-ориентированный маршрутизатор:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html

Ответ 3

Несмотря на то, что при запросе конкретных атрибутов значение будет просто установлено равным null для сообщений, которые не содержат атрибута, вы все равно можете фильтровать таким образом. Те, у кого нет атрибута, установленного так, как вы хотите, могут иметь свою видимость в 1 и затем отпущены, поэтому они останутся в очереди. Дал бы грубый способ выполнения очереди очередей, хотя вы могли бы так же легко сделать то же самое на основе содержимого сообщения.

Ответ 4

Протестировано в разных случаях. это не работает. Ответ: NO

TestData​​STRONG >

public void fillQueueWithMessages(){

  MessageAttributeValue value1 = new MessageAttributeValue();
  value1.setDataType("String");
  value1.setStringValue("1");

  SendMessageRequest send_msg_request = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test1").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request);


  MessageAttributeValue value2 = new MessageAttributeValue();
  value2.setDataType("String");
  value2.setStringValue("2");


  SendMessageRequest send_msg_request2 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test2").addMessageAttributesEntry(value2.getStringValue(), value2);
  amazonSqs.sendMessage(send_msg_request2);

  SendMessageRequest send_msg_request3 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test3").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request3);

}

Test

public void shouldPollMessagesBasedOnMessageAttribute() throws InterruptedException {

ReceiveMessageRequest request =
    new ReceiveMessageRequest(env.getProperty("cloud.aws.sqs.readyForTranslation.url"));
request.setMaxNumberOfMessages(3);
request.setWaitTimeSeconds(20);
request.withMessageAttributeNames("1");

List<Message> messages = new ArrayList<Message>();

messages = amazonSqs.receiveMessage(request).getMessages();

assertEquals(2, messages.size());
}

Ответ 6

Давайте рассмотрим это с помощью некоторых примеров поэтому создайте сообщение 10 и отправьте его

// Send a message
for (int i = 0; i < 10; i++) {
    System.out.println("Sending a message to MyQueue.\n");
    Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    // extra code

    String sdate;
    Format formatter;
    Date date = new Date();

    // 2012-12-01
    formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
    sdate = formatter.format(date);
    System.out.println(sdate);

    messageAttributes.put("Datestamp"+i, new MessageAttributeValue().withDataType("String").withStringValue(sdate));

    Map<String, MessageAttributeValue> messageAttributes1 = new HashMap<>();
    messageAttributes1.put("attributeName", new MessageAttributeValue().withDataType("String").withStringValue(sdate));
    SendMessageRequest request = new SendMessageRequest();
    request.withMessageBody("A test message body."+sdate);
    request.withQueueUrl(myQueueUrl);
    request.withMessageAttributes(messageAttributes);
    sqs.sendMessage(request);
}

Теперь даже у вас есть 10 сообщений с datetimestamp1 до datetimestamp10
фильтрация с атрибутом не будет работать

позволяет попробовать фильтр с некоторым атрибутом myTag

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

//ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveMessageRequest.withMaxNumberOfMessages(10);
receiveMessageRequest.withMessageAttributeNames("myTag");
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

Он дает 10 сообщений, а значение myTag равно null

message.getMessageAttributes(). get ( "Datestamp" ) - null message.getMessageAttributes(). get ( "myTag" ) имеет значение null

Поэтому мы не можем фильтровать с атрибутом сообщения, как если бы этот ключ не был найден. ни один атрибут сообщения или атрибут "Все сообщения" не совпадают.

Так долго ответ: NOOOOO

Ответ 7

это на самом деле не все верно,

на самом деле вы можете "любопытно" фильтровать сообщения в очереди с использованием трюков атрибутов сообщений.

каждое сообщение может содержать атрибуты, которые вы можете добавить при создании сообщения (вам нужно будет предоставить 3 вещи для каждого атрибута: имя, тип, значение).

позже, когда вы создаете новый объект ReceiveMessageRequest, вы можете использовать "withMessageAttributeNames" для указания атрибута, и на самом деле происходит то, что ваша очередь фильтруется для сообщений, содержащих этот конкретный атрибут.

например:

String queueUrl = sqs.getQueueUrl("myQueue").getQueueUrl();

ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveRequest.withMaxNumberOfMessages(10);
receiveRequest.withMessageAttributeNames("myTag");

если ваша очередь имела 5 сообщений, но только 1 имел атрибут "myTag", тогда возвращается только тот, который будет указан.

это было для меня подавляющим, поскольку это не упоминается в ReceiveMessageRequest API

так что в основном все, что вам нужно сделать, это дать каждому сообщению уникальный атрибут (обратите внимание на пределы атрибутов: The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore (_), hyphen (-), and period (.). The name must not start or end with a period, and it should not have successive periods. The name is case sensitive and must be unique among all attribute names for the message. The name can be up to 256 characters long. The name cannot start with "AWS." or "Amazon." (or any variations in casing)