В нашей компании есть веб-сайт на основе Python и некоторые рабочие узлы на основе Python, которые взаимодействуют через Django/Celery и RabbitMQ. У меня есть приложение на основе Java, которое должно отправлять задания работникам, работающим на сельдере. Я могу отправить задания на RabbitMQ с Java просто отлично, но работники, основанные на сельдери, никогда не набирают работу. От взгляда на захват пакетов обоих типов представлений заданий есть различия, но я не могу понять, как их учитывать, потому что многие из них двоичные, что я не могу найти документацию об декодировании. Кто-нибудь здесь имеет какую-либо ссылку или опыт работы с совместной работой Java/RabbitMQ и Celery?
Взаимодействие с Django/Celery From Java
Ответ 1
Я нашел решение. Библиотека Java для RabbitMQ относится к обменам/очередям/ключам маршрута. В Celery имя очереди фактически сопоставляется с обменом, упомянутым в библиотеке Java. По умолчанию очередь для сельдерея - это просто "сельдерей". Если ваши настройки Django определяют очередь, называемую "myqueue", используя следующий синтаксис:
CELERY_ROUTES = {
'mypackage.myclass.runworker' : {'queue':'myqueue'},
}
Затем код на основе Java должен сделать что-то вроде следующего:
ConnectionFactory factory = new ConnectionFactory();
Connection connection = null ;
try {
connection = factory.newConnection(mqHost, mqPort);
} catch (IOException ioe) {
log.error("Unable to create new MQ connection from factory.", ioe) ;
}
Channel channel = null ;
try {
channel = connection.createChannel();
} catch (IOException ioe) {
log.error("Unable to create new channel for MQ connection.", ioe) ;
}
try {
channel.queueDeclare("celery", false, false, false, true, null);
} catch (IOException ioe) {
log.error("Unable to declare queue for MQ channel.", ioe) ;
}
try {
channel.exchangeDeclare("myqueue", "direct") ;
} catch (IOException ioe) {
log.error("Unable to declare exchange for MQ channel.", ioe) ;
}
try {
channel.queueBind("celery", "myqueue", "myqueue") ;
} catch (IOException ioe) {
log.error("Unable to bind queue for channel.", ioe) ;
}
// Generate the message body as a string here.
try {
channel.basicPublish(mqExchange, mqRouteKey,
new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
messageBody.getBytes("ASCII"));
} catch (IOException ioe) {
log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
}
Оказывается, это просто разница в терминологии.