У меня есть реактор, который извлекает сообщения от брокера RabbitMQ и запускает рабочие методы для обработки этих сообщений в пуле процессов, примерно так:
Это реализовано с использованием python asyncio
, loop.run_in_executor()
и concurrent.futures.ProcessPoolExecutor
.
Теперь я хочу получить доступ к базе данных в рабочих методах с помощью SQLAlchemy. В основном обработка будет очень простой и быстрой CRUD-операциями.
Вначале реактор обработает 10-50 сообщений в секунду, поэтому неприемлемо открывать новое соединение с базой данных для каждого запроса. Скорее, я хотел бы поддерживать одно постоянное соединение для каждого процесса.
Мои вопросы: как я могу это сделать? Могу ли я просто хранить их в глобальной переменной? Будет ли пул соединений SQA обработать это для меня? Как очистить, когда реактор остановится?
[Обновление]
- База данных - это MySQL с InnoDB.
Зачем выбирать этот шаблон с пулом процессов?
В текущей реализации используется другой шаблон, в котором каждый потребитель работает в своем потоке. Так или иначе, это не очень хорошо. Есть уже около 200 потребителей, каждый из которых работает в своем потоке, и система быстро растет. Чтобы лучше масштабироваться, идея заключалась в том, чтобы разделить проблемы и потреблять сообщения в цикле ввода-вывода и делегировать обработку пулу. Конечно, производительность всей системы в основном связана с вводом-выводом. Однако CPU обрабатывает большие результирующие наборы.
Другая причина заключалась в "простоте использования". Хотя обработка соединений и потребление сообщений реализованы асинхронно, код рабочего может быть синхронным и простым.
Вскоре стало очевидным, что доступ к удаленным системам через постоянные сетевые подключения изнутри работника является проблемой. Это то, что предназначены для CommunicationChannels: внутри рабочего я могу передавать запросы на шину сообщений через эти каналы.
Одной из моих текущих идей является обращение к доступу к БД аналогичным образом: передайте инструкции через очередь в цикл событий, где они отправляются в БД. Однако я не знаю, как это сделать с помощью SQLAlchemy.
Где будет точка входа?
Объекты должны быть pickled
, когда они передаются через очередь. Как получить такой объект из запроса SQA?
Связь с базой данных должна работать асинхронно, чтобы не блокировать цикл события. Могу ли я использовать, например. aiomysql как драйвер базы данных для SQA?