Подтвердить что ты не робот

Как обрабатывать соединения SQLAlchemy в ProcessPool?

У меня есть реактор, который извлекает сообщения от брокера RabbitMQ и запускает рабочие методы для обработки этих сообщений в пуле процессов, примерно так:

Реактор

Это реализовано с использованием python asyncio, loop.run_in_executor() и concurrent.futures.ProcessPoolExecutor.

Теперь я хочу получить доступ к базе данных в рабочих методах с помощью SQLAlchemy. В основном обработка будет очень простой и быстрой CRUD-операциями.

Вначале реактор обработает 10-50 сообщений в секунду, поэтому неприемлемо открывать новое соединение с базой данных для каждого запроса. Скорее, я хотел бы поддерживать одно постоянное соединение для каждого процесса.

Мои вопросы: как я могу это сделать? Могу ли я просто хранить их в глобальной переменной? Будет ли пул соединений SQA обработать это для меня? Как очистить, когда реактор остановится?

[Обновление]

  • База данных - это MySQL с InnoDB.

Зачем выбирать этот шаблон с пулом процессов?

В текущей реализации используется другой шаблон, в котором каждый потребитель работает в своем потоке. Так или иначе, это не очень хорошо. Есть уже около 200 потребителей, каждый из которых работает в своем потоке, и система быстро растет. Чтобы лучше масштабироваться, идея заключалась в том, чтобы разделить проблемы и потреблять сообщения в цикле ввода-вывода и делегировать обработку пулу. Конечно, производительность всей системы в основном связана с вводом-выводом. Однако CPU обрабатывает большие результирующие наборы.

Другая причина заключалась в "простоте использования". Хотя обработка соединений и потребление сообщений реализованы асинхронно, код рабочего может быть синхронным и простым.

Вскоре стало очевидным, что доступ к удаленным системам через постоянные сетевые подключения изнутри работника является проблемой. Это то, что предназначены для CommunicationChannels: внутри рабочего я могу передавать запросы на шину сообщений через эти каналы.

Одной из моих текущих идей является обращение к доступу к БД аналогичным образом: передайте инструкции через очередь в цикл событий, где они отправляются в БД. Однако я не знаю, как это сделать с помощью SQLAlchemy. Где будет точка входа? Объекты должны быть pickled, когда они передаются через очередь. Как получить такой объект из запроса SQA? Связь с базой данных должна работать асинхронно, чтобы не блокировать цикл события. Могу ли я использовать, например. aiomysql как драйвер базы данных для SQA?

4b9b3361

Ответ 1

Ваше требование подключения к одной базе данных в процессе процесса пула может быть легко удовлетворено, если вам будет оказана помощь в том, как вы создаете экземпляр session, предполагая, что работаете с ормом, в рабочем процессы.

Простое решение состоит в том, чтобы иметь глобальный сеанс, который вы повторно используете для разных запросов:

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 

И по рабочей задаче:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end

Аргументы pool_size и max_overflow настроить по умолчанию QueuePool, используемой create_engine. pool_size будет гарантировать, что ваш процесс поддерживает только 1 подключение для каждого процесса в пуле процессов.

Если вы хотите повторно подключиться, вы можете использовать DBSession.remove(), который удалит сеанс из реестра и заставит его повторно подключиться к следующему использованию DBSession. Вы также можете использовать аргумент recycle Pool, чтобы восстановить соединение через указанный промежуток времени.

Во время разработки /debbuging вы можете использовать AssertionPool, что приведет к возникновению исключения, если из пула выведено более одного соединения, см. переключение реализаций пула о том, как это сделать.

Ответ 2

@roman: Хороший вызов у ​​вас там.

У меня есть аналогичный сценарий, прежде чем вот так вот мои 2 цента: если этот потребитель не только "прочитал" и "написал" сообщение, не выполнив никаких реальных действий, вы можете перепроектировать этого потребителя как потребителя/продюсер, который будет использовать это сообщение, он обработает сообщение, а затем поместит результат в другую очередь, эта очередь (обработанные сообщения, например,) может быть прочитана 1.NN без объединения асинхронных процессов, которые могли бы открыть соединение с БД в нем собственный жизненный цикл.

Я могу продлить свой ответ, но я не знаю, подходит ли этот подход для ваших нужд, если да, я могу дать вам более подробную информацию о расширенном дизайне.

Ответ 3

Подход, который мне очень понравился, - использовать веб-сервер для обработки и масштабирования пула процессов. flask-sqlalchemy даже в состоянии по умолчанию будет поддерживать пул соединений и не закрывать каждое соединение в каждом цикле ответа запроса.

Асинхронный исполнитель может просто вызвать конечные точки url для выполнения ваших функций. Дополнительным преимуществом является то, что, поскольку все процессы, выполняющие работу, находятся за URL-адресом, вы можете тривиально масштабировать свой рабочий пул по нескольким машинам, добавляя больше процессов через пушки или один из других методов, чтобы масштабировать простой сервер wsgi. Кроме того, вы получаете всю отказоустойчивую доброту.

Недостатком является то, что вы можете передавать больше информации по сети. Однако, как вы говорите, проблема связана с процессором, и вы, вероятно, будете передавать гораздо больше данных в базу данных и из нее.