Подтвердить что ты не робот

Работа с дублирующимися первичными ключами при вставке в SQLAlchemy (декларативный стиль)

Мое приложение использует сеанс с областью действия и декларативный стиль SQLALchemy. Это веб-приложение, и многие вставки БД выполняются Celery, планировщиком задач.

Как правило, при принятии решения о вставке объекта мой код может выполнять что-то в следующих строках:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

Проблема здесь в том, что, поскольку многие из них выполняются асинхронными рабочими, возможно, что один работает на полпути, хотя вставляет Bike с id=123, а другой проверяет его существование. В этом случае второй рабочий попытается вставить строку с тем же самым первичным ключом, и SQLAlchemy поднимет значение IntegrityError.

Я не могу на всю жизнь найти хороший способ справиться с этой проблемой, кроме замены Session.commit() для:

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

И тогда везде у меня есть Session.commit Теперь у меня есть schema.commit(ignore=True), где я не против, чтобы строка не была вставлена ​​снова.

Для меня это кажется очень хрупким из-за проверки строки. Подобно FYI, когда создается IntegrityError, он выглядит так:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

Итак, конечно, основным ключом, который я вставлял, было что-то вроде Duplicate entry is a cool thing, тогда я предполагаю, что могу пропустить IntegrityError, которые на самом деле не были из-за дублирования первичных ключей.

Существуют ли какие-либо более эффективные подходы, которые поддерживают чистый подход SQLAlchemy, который я использую (в отличие от того, чтобы начинать выписывать инструкции в строках и т.д.).

Db - это MySQL (хотя для модульного тестирования я хотел бы использовать SQLite и не хотел бы препятствовать этой способности любыми новыми подходами).

Ура!

4b9b3361

Ответ 1

Если вы используете session.merge(bike) вместо session.add(bike), то вы не будете генерировать ошибки первичного ключа. bike будет извлечен, обновлен или создан по мере необходимости.

Ответ 2

Вы должны обрабатывать каждый IntegrityError таким же образом: откат транзакции и, возможно, повторите попытку. Некоторые базы данных даже не позволят вам сделать что-то большее, чем после IntegrityError. Вы также можете получить блокировку на столе или более мелкую блокировку, если база данных позволяет это, в начале двух конфликтующих транзакций.

Использование оператора with для явного начала транзакции и автоматического фиксации (или откат для любого исключения):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)

Ответ 3

Я предполагаю, что ваши первичные ключи здесь естественны в некотором роде, поэтому вы не можете полагаться на обычные методы автоинкремента. Так что пусть проблема - это действительно один из уникальных столбцов, которые вам нужно вставить, что является более распространенным.

если вы хотите, чтобы "попытаться вставить, откат частично при сбое", вы используете SAVEPOINT, который с SQLAlchemy begin_nested(). следующий rollback() или commit() действует только на этот SAVEPOINT, а не на большее количество событий.

Однако общий шаблон здесь - это просто тот, которого действительно следует избегать. То, что вы действительно хотите здесь сделать, - одна из трех вещей. 1. Не выполняйте параллельные задания, которые обрабатывают те же ключи, которые необходимо вставить. 2. Синхронизируйте задания каким-либо образом с одновременными работающими ключами и 3. используйте некоторую общую службу для создания новых записей этого конкретного типа, совместно используемых заданиями (или убедитесь, что все они настроены до запуска заданий).

Если вы думаете об этом, № 2 имеет место в любом случае с высокой степенью изоляции. Начните две сессии postgres. Сессия 1:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

сеанс 2:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

то, что вы увидите, - блоки секций 2, поскольку строка с PK # 1 заблокирована. Я не уверен, что MySQL достаточно умен, чтобы сделать это, но это правильное поведение. Если OTOH вы пытаетесь вставить другой PK:

^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

он отлично работает без блокировки.

Дело в том, что если вы делаете такой вид конкуренции PK/UQ, ваши задачи по сельдеру будут сериализоваться в любом случае, или, по крайней мере, они должны быть.

Ответ 4

Вместо session.add(obj) вам нужно использовать приведенные ниже коды, это будет намного чище, и вам не нужно будет использовать пользовательскую функцию фиксации, как вы уже упоминали. Однако это будет игнорировать конфликт, но не только для дублирующего ключа, но и для других.

MySQL

 self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))

SQLite

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))