Подтвердить что ты не робот

RabbitMQ и каналы безопасности потоков Java

в этом руководстве https://www.rabbitmq.com/api-guide.html Ребята из RabbitMQ заявляют:

Каналы и Concurrency Соображения (безопасность потока)

Канальные экземпляры не должны делиться между потоками. Приложения должны предпочесть использовать канал для потока, а не использовать один и тот же канал для нескольких потоков. Хотя некоторые операции над каналами безопасны для одновременного вызова, некоторые из них не приводят к некорректному чередованию кадров на проводе. Совместное использование каналов между потоками также будет мешать работе * Publisher Confirms.

Безопасность потоков очень важна, поэтому я старался быть настолько старательным, насколько это возможно, но здесь проблема:

У меня есть это приложение, которое получает сообщения от Rabbit. Когда сообщение получено, оно обрабатывает его, а затем запускается, когда оно выполняется. Приложение может обрабатывать всего 2 элемента одновременно в фиксированном пуле потоков с двумя потоками. Предварительная выборка QOS для Rabbit равна 2, потому что я не хочу, чтобы приложение было больше, чем может обрабатывать в течение периода времени.

Теперь моя потребительская доставкаДоставка:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

На этом этапе вы уже поняли, что TestWrapperThread выполняет вызов channel.basicAck(deliveryTag, false); как последнюю операцию.

По моему пониманию документации, это неверно и потенциально вредно, потому что канал не является потокобезопасным, и это поведение может повредить. Но как я должен это делать? Я имею в виду, у меня есть несколько идей, но они будут делать все более сложным, и я хотел бы понять, действительно ли это необходимо или нет.

Заранее спасибо

4b9b3361

Ответ 1

Я полагаю, что вы используете Channel только для своего потребителя, а не для других операций, таких как публикация и т.д.

В вашем случае единственная потенциальная проблема здесь:

channel.basicAck(deliveryTag, false);

потому что вы вызываете это через два потока, btw эта операция безопасна, если вы видите Java-код:

класс ChannelN.java вызывает:

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
{
   transmit(new Basic.Ack(deliveryTag, multiple));
}

см. код github для ChannelN.java

Метод transmit внутри AMQChannel использует:

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));
   }
}

_channelMutex является protected final Object _channelMutex = new Object();

созданный с помощью класса. см. код github для AMQChannel.java

ИЗМЕНИТЬ

Как вы можете прочитать в официальной документации, "некоторые" операции потокобезопасны, теперь неясно, какие из них. Я изучил код, и я думаю, что нет проблем с вызовом ACK для большего количества потоков.

Надеюсь, это поможет.

EDIT2 Добавляю также комментарий Николаса:

Обратите внимание, что потребление (basicConsume) и acking из более чем одного потока - это общий шаблон rabbitmq, который уже используется java-клиентом.

Таким образом, вы можете использовать его в безопасности.