Подтвердить что ты не робот

Правильная обработка SQLAlchemy сеанса в многопоточных приложениях

Мне трудно понять, как правильно открывать и закрывать сеансы базы данных эффективно, как я понял в документации sqlalchemy, если я использую scoped_session для создания объекта Session, а затем использую возвращаемый объект Session для создания сеансов, он потокобезопасен, поэтому в основном каждый поток будет иметь свою собственную сессию, и проблем с ней не будет. Теперь приведен пример ниже: я помещаю его в бесконечный цикл, чтобы проверить, правильно ли он закрывает сеансы, и если я правильно его отслеживаю (в mysql, выполняя "SHOW PROCESSLIST;" ), соединения просто продолжают расти, они не закрывают их, хотя я использовал session.close() и даже удалял объект scoped_session в конце каждого прогона. Что я делаю не так? Моя цель в более крупном приложении - использовать минимальное количество подключений к базе данных, потому что моя текущая рабочая реализация создает новый сеанс в каждом методе, где требуется, и закрывает его перед возвратом, что кажется неэффективным.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
        self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
        self.DBSession = scoped_session(
            sessionmaker(
                autoflush=True,
                autocommit=False,
                bind=self.db_engine
            )
        )

    def _worker(self):
        db_session = self.DBSession()
        while True:
            try:
                task_id = self.task_queue.get(False)
                try:
                    item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
                    # do something with item
                except Exception as exc:
                    # if an error occurrs we skip it
                    continue

                finally:
                    db_session.commit()
                    self.task_queue.task_done()
            except QueueEmpty:
                db_session.close()
                return

    def start(self):
        try:
            db_session = self.DBSession()
            all_items = db_session.query(MyModel).all()
            for item in all_items:
                self.task_queue.put(item.id)

            for _i in range(self.worker_count):
                t = Thread(target=self._worker)
                t.start()

            self.task_queue.join()
        finally:
            db_session.close()
            self.DBSession.remove()


if __name__ == '__main__':
    while True:
        mt_worker = MTWorker(worker_count=50)
        mt_worker.start()
4b9b3361

Ответ 1

Вы должны звонить только create_engine и scoped_session один раз за процесса (для каждой базы данных). Каждый из них получит свой собственный пул соединений или сеансов (соответственно), поэтому вы хотите убедиться, что вы создаете только один пул. Просто сделайте его модульным уровнем глобальным. если вам нужно более эффективно управлять своими сеансами, вы, вероятно, не должны использовать scoped_session

Еще одно изменение заключается в том, чтобы использовать DBSession напрямую, как если бы это было сессия. методы вызова сеанса на scoped_session будут прозрачно при необходимости создайте поток-локальный сеанс и переадресуйте вызов метода на сессии.

Еще одна вещь, о которой нужно знать, - это pool_size  пула соединений, который по умолчанию - 5. Для многих приложений это хорошо, но если вы создаете много потоков, вам может потребоваться настроить этот параметр

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
    sessionmaker(
        autoflush=True,
        autocommit=False,
        bind=db_engine
    )
)


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
# snip