Подтвердить что ты не робот

Очередь работы как таблица SQL с несколькими потребителями (PostgreSQL)

У меня типичная проблема производителя и потребителя:

Несколько приложений-производителей записывают запросы на задания в таблицу заданий в базе данных PostgreSQL.

Запросы заданий имеют поле состояния, которое начинается с QUEUED при создании.

Есть несколько потребительских приложений, которые уведомляются по правилу, когда производитель вставляет новую запись:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

Они попытаются зарезервировать новую запись, установив ее состояние в положение RESERVED. Конечно, только на потребителя должно получиться. Все остальные потребители не должны резервировать одну и ту же запись. Вместо этого они должны резервировать другие записи с состоянием = QUEUED.

Пример: некоторые производители добавили следующие записи в таблицу jobrecord:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

теперь два потребителя A, B хотят их обработать. Они начинают работать одновременно. Нужно зарезервировать идентификатор 1, другой должен зарезервировать id 2, тогда первый, кто закончит, должен зарезервировать id 3 и т.д.

В чистом многопоточном мире я бы использовал мьютекс для контроля доступа к очереди заданий, но потребители - это разные процессы, которые могут выполняться на разных машинах. Они получают доступ только к одной базе данных, поэтому вся синхронизация должна выполняться через базу данных.

Я прочитал много документации о параллельном доступе и блокировке в PostgreSQL, например. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Выберите разблокированную строку в Postgresql PostgreSQL и блокировка

Из этих тем я узнал, что следующий SQL-оператор должен делать то, что мне нужно:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

К сожалению, когда я запускаю это в нескольких потребительских процессах, примерно в 50% случаев они по-прежнему сохраняют одну и ту же запись, обрабатывая ее и переписывая изменения другой.

Что мне не хватает? Как мне написать инструкцию SQL, чтобы несколько пользователей не оставляли одну и ту же запись?

4b9b3361

Ответ 2

Я использую postgres для очереди FIFO. Первоначально я использовал ACCESS EXCLUSIVE, что дает правильные результаты в высоком concurrency, но имеет неприятный эффект от взаимного исключения с pg_dump, который получает блокировку ACCESS SHARE во время ее выполнения. Это приводит к тому, что моя функция next() блокируется очень долго (длительность pg_dump). Это было неприемлемо, так как мы являемся магазином 24x7, и клиентам не нравилось мертвое время в очереди посреди ночи.

Я полагал, что должен быть менее ограничивающий замок, который по-прежнему будет одновременно устойчивым, а не заблокирован, пока работает pg_dump. Мой поиск привел меня к этому сообщению SO.

Затем я сделал некоторые исследования.

Для функции NEXT() очереди FIFO достаточно использовать следующие режимы, которые будут обновлять статус задания из очереди в очередь без каких-либо concurrency сбоев, а также не блокировать pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Query:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Результат выглядит так:

UPDATE 1
 job_id
--------
     98
(1 row)

Ниже приведена оболочка script, которая проверяет весь режим блокировки при высоком concurrency (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Код также здесь, если вы хотите изменить: https://gist.github.com/1083936

Я обновляю свое приложение, чтобы использовать режим EXCLUSIVE, так как он является самым ограничительным режимом, который a) правильный, и b) не конфликтует с pg_dump. Я выбрал наиболее ограничительный, поскольку он кажется наименее рискованным с точки зрения изменения приложения от ACCESS EXCLUSIVE, не являясь экспертом uber при блокировке postgres.

Я чувствую себя довольно комфортно с моей тестовой установкой и с общими идеями, стоящими за ответом. Я надеюсь, что совместное использование помогает решить эту проблему для других.

Ответ 3

Не нужно делать целую блокировку таблицы для этого: \.

Блокировка строк, созданная с помощью for update, отлично работает.

См. https://gist.github.com/mackross/a49b72ad8d24f7cefc32 для изменения, которое я сделал для ответа apinstein, и подтвердил, что он все еще работает.

Конечный код

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;

Ответ 6

Откроем PgQ вместо того, чтобы изобретать колесо.

Ответ 7

Хорошо, вот решение, которое работает для меня, основано на ссылке от Йордани. Поскольку некоторые из моих проблем были в том, как работает Qt-SQL, я включил Qt-код:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

Чтобы проверить, если несколько пользователей обрабатывают одно и то же задание, я добавил правило и таблицу журналов:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Без оператора LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; таблица журналов заполняет случайные записи, если один из пользователей перезаписал значения другого, но с помощью оператора LOCK таблица журналов остается пустой: -)