Подтвердить что ты не робот

Выберите разблокированную строку в Postgresql

Есть ли способ выбрать строки в Postgresql, которые не заблокированы? У меня есть многопоточное приложение, которое будет делать:

Select... order by id desc limit 1 for update

на столе.

Если несколько потоков запускают этот запрос, они оба пытаются отбросить ту же строку.

Один получает блокировку строк, другие блоки и затем сбой после первого обновления строки. Мне бы очень хотелось, чтобы второй поток получил первую строку, которая соответствует предложению WHERE и еще не заблокирована.

Чтобы уточнить, я хочу, чтобы каждый поток сразу обновлял первую доступную строку после выбора.

Итак, если есть строки с ID: 1,2,3,4, первый поток будет включен, выберите строку с ID=4 и немедленно обновите ее.

Если во время этой транзакции приходит второй поток, я бы хотел, чтобы он получил строку с ID=3 и сразу же обновил эту строку.

Для Share это не будет выполнено ни с nowait, поскольку предложение WHERE будет соответствовать заблокированной строке (ID=4 in my example). В основном, что я хотел бы, это что-то вроде "AND NOT LOCKED" в предложении WHERE.

Users

-----------------------------------------
ID        | Name       |      flags
-----------------------------------------
1         |  bob       |        0
2         |  fred      |        1
3         |  tom       |        0
4         |  ed        |        0

Если запрос "Select ID from users where flags = 0 order by ID desc limit 1", и когда строка возвращается, следующая вещь - "Update Users set flags = 1 where ID = 0", тогда я хотел бы, чтобы первый поток в захвате строки с ID 4, а следующий в возьмите строку с ID 3.

Если я добавлю "For Update" к выбору, то первый поток получает строку, второй блокирует и затем ничего не возвращает, потому что, как только первая транзакция совершает предложение WHERE, больше не выполняется.

Если я не использую "For Update", тогда мне нужно добавить предложение WHERE для последующего обновления (WHERE flags = 0), поэтому только один поток может обновить строку.

Второй поток выберет ту же строку, что и первая, но второе обновление потока завершится неудачно.

В любом случае второй поток не может получить строку и обновление, потому что я не могу заставить базу данных дать строку 4 первому потоку и строке 3 ко второму потоку транзакций перекрываться.

4b9b3361

Ответ 2

Нет Нет NOOO: -)

Я знаю, что означает автор. У меня такая же ситуация, и я придумал хорошее решение. Сначала я начну с описания моей ситуации. У меня есть таблица i, в которой хранятся сообщения, которые нужно отправить в определенное время. PG не поддерживает синхронизацию функций, поэтому мы должны использовать демоны (или cron). Я использую специально написанный script, который открывает несколько параллельных процессов. Каждый процесс выбирает набор сообщений, которые нужно отправлять с точностью +1 сек /-1 сек. Сама таблица динамически обновляется новыми сообщениями.

Поэтому для каждого процесса необходимо загрузить набор строк. Этот набор строк не может быть загружен другим процессом, потому что это вызовет много беспорядков (некоторые люди получат пару сообщений, когда они должны получить только один). Вот почему нам нужно блокировать строки. Запрос на загрузку набора сообщений с помощью блокировки:

FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE FOR UPDATE LOOP
-- DO SMTH
END LOOP;

процесс с этим запросом запускается каждые 0,5 секунды. Таким образом, это приведет к следующему запросу, ожидающему первой блокировки, чтобы разблокировать строки. Такой подход создает огромные задержки. Даже когда мы используем NOWAIT, запрос приведет к исключению, которое нам не нужно, поскольку в таблице могут быть отправлены новые сообщения. Если использовать просто FOR SHARE, запрос будет выполнен правильно, но все равно потребуется много времени, создавая огромные задержки.

Чтобы сделать это, мы делаем немного магии:

  • изменение запроса:

    FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE AND is_locked(msg_id) IS FALSE FOR SHARE LOOP
    -- DO SMTH
    END LOOP;
    
  • таинственная функция is_locked (msg_id) выглядит следующим образом:

    CREATE OR REPLACE FUNCTION is_locked(integer) RETURNS BOOLEAN AS $$
    DECLARE
        id integer;
        checkout_id integer;
        is_it boolean;
    BEGIN
        checkout_id := $1;
        is_it := FALSE;
    
        BEGIN
            -- we use FOR UPDATE to attempt a lock and NOWAIT to get the error immediately 
            id := msg_id FROM public.messages WHERE msg_id = checkout_id FOR UPDATE NOWAIT;
            EXCEPTION
                WHEN lock_not_available THEN
                    is_it := TRUE;
        END;
    
        RETURN is_it;
    
    END;
    $$ LANGUAGE 'plpgsql' VOLATILE COST 100;
    

Конечно, мы можем настроить эту функцию для работы с любой таблицей, имеющейся в вашей базе данных. На мой взгляд, лучше создать одну функцию проверки для одной таблицы. Добавление большего количества функций к этой функции может сделать ее только медленнее. Мне требуется больше времени, чтобы проверить это предложение, так что нет необходимости делать это еще медленнее. Для меня это полное решение, и оно отлично работает.

Теперь, когда у меня 50 процессов, работающих параллельно, каждый процесс имеет уникальный набор свежих сообщений для отправки. После отправки я просто обновляю строку с отправленным = ИСТИНА и никогда не вернусь к ней снова.

Я надеюсь, что это решение также сработает для вас (автор). Если у вас есть какие-либо вопросы, просто сообщите мне: -)

О, и дайте мне знать, если это сработало для вас как-хорошо.

Ответ 3

Я использую что-то вроде этого:

select  *
into l_sms
from sms
where prefix_id = l_prefix_id
    and invoice_id is null
    and pg_try_advisory_lock(sms_id)
order by suffix
limit 1;

и не забудьте вызвать pg_advisory_unlock

Ответ 4

Если вы пытаетесь реализовать очередь, взгляните на PGQ, который уже решил эту и другие проблемы. http://wiki.postgresql.org/wiki/PGQ_Tutorial

Ответ 5

Похоже, вы пытаетесь сделать что-то вроде захвата элемента с наивысшим приоритетом в очереди, которая еще не рассматривается другим процессом.

Вероятным решением является добавление предложения where, ограничивающего его необработанными запросами:

select * from queue where flag=0 order by id desc for update;
update queue set flag=1 where id=:id;
--if you really want the lock:
select * from queue where id=:id for update;
...

Надеемся, что вторая транзакция будет заблокирована, пока произойдет обновление до флага, тогда она сможет продолжить, но флаг будет ограничивать ее следующей строкой.

Вероятно также, что с использованием сериализуемого уровня изоляции вы можете получить желаемый результат без всякого этого безумия.

В зависимости от характера вашего приложения могут быть лучшие способы его реализации, чем в базе данных, например, в FIFO или LIFO-канале. Кроме того, может быть возможно изменить порядок, в котором они вам нужны, и использовать последовательность, чтобы гарантировать, что они будут обрабатываться последовательно.

Ответ 6

Это может быть выполнено с помощью SELECT... NOWAIT; пример здесь.

Ответ 7

Похоже, вы ищете SELECT FOR SHARE.

http://www.postgresql.org/docs/8.3/interactive/sql-select.html#SQL-FOR-UPDATE-SHARE

FOR SHARE ведет себя аналогично, за исключением того, что он получает общую, а не эксклюзивную блокировку для каждой полученной строки. Общий блокировщик блокирует другие транзакции от выполнения UPDATE, DELETE или SELECT FOR UPDATE в этих строках, но это не мешает им выполнять SELECT FOR SHARE.

Если определенные таблицы называются в FOR UPDATE или FOR SHARE, тогда блокируются только строки из этих таблиц; любые другие таблицы, используемые в SELECT, просто читаются, как обычно. Предложение FOR UPDATE или FOR SHARE без списка таблиц влияет на все таблицы, используемые в команде. Если FOR UPDATE или FOR SHARE применяется к представлению или подзапросу, это влияет на все таблицы, используемые в представлении или подзапросе.

Несколько предложений FOR UPDATE и FOR SHARE могут быть записаны, если необходимо указать разные правила блокировки для разных таблиц. Если одна и та же таблица упоминается (или неявно затронута) с помощью предложений FOR UPDATE и FOR SHARE, то она обрабатывается как FOR UPDATE. Аналогично, таблица обрабатывается как NOWAIT, если она указана в любом из предложений, влияющих на нее.

FOR UPDATE и FOR SHARE не могут использоваться в контекстах, где возвращенные строки не могут быть четко идентифицированы с отдельными строками таблицы; например, они не могут использоваться с агрегацией.

Ответ 8

Чего вы пытаетесь достичь? Можете ли вы лучше объяснить, почему ни разблокированные обновления строк, ни полные транзакции не будут делать то, что вы хотите?

Еще лучше, можете ли вы предотвратить конфликт и просто иметь в каждом потоке другое смещение? Это не будет работать хорошо, если соответствующая часть таблицы будет часто обновляться; у вас все еще будут столкновения, но только при большой нагрузке на вставку.

Select... order by id desc offset THREAD_NUMBER limit 1 for update

Ответ 9

Поскольку я еще не нашел лучшего ответа, я решил использовать блокировку в своем приложении для синхронизации доступа к коду, который выполняет этот запрос.

Ответ 10

Как насчет следующего? Он может обрабатываться более атомарно, чем другие примеры, но должен быть проверен еще, чтобы убедиться, что мои предположения не ошибаются.

UPDATE users SET flags = 1 WHERE id = ( SELECT id FROM users WHERE flags = 0 ORDER BY id DESC LIMIT 1 ) RETURNING ...;

Вероятно, вы все равно будете придерживаться любой схемы блокировки, которую postgres использует внутри, для обеспечения согласованных результатов SELECT перед одновременными UPDATE.

Ответ 11

Я столкнулся с той же проблемой в нашем приложении и придумал решение, очень похожее на подход Гранта Джонсона. Канал FIFO или LIFO не был вариантом, потому что у нас есть кластер серверов приложений, обращающихся к одной БД. То, что мы делаем, - это

SELECT ... WHERE FLAG=0 ... FOR UPDATE
, после которого следует
UPDATE ... SET FLAG=1 WHERE ID=:id
, как можно скорее, чтобы время блокировки было как можно меньше. В зависимости от количества столбцов и размеров столбца это может помочь получить только идентификатор в первом выборе и после того, как вы пометили строку для извлечения оставшихся данных. Хранимая процедура может еще больше уменьшить количество круговых поездок.

Ответ 12

^^, который работает. подумайте о наличии "немедленного" статуса "заблокированного".

Скажем, ваша таблица такова:

id | имя | фамилия | статус

Иными состояниями, например, являются: 1 = ожидающий, 2 = заблокированный, 3 = обработанный, 4 = отказ, 5 = отклоненный

Каждая новая запись вставлена ​​со статусом pending (1)

Ваша программа выполняет следующие действия: "обновить статус набора mytable = 2, где id = (выберите id из mytable, где имя типа" % John% "и статус = 1 предел 1) возвращает идентификатор, имя, фамилию"

Тогда ваша программа выполняет свою задачу, и если она придумает вывод о том, что этот поток не должен обрабатывать эту строку вообще, он делает следующее: "update mytable set status = 1 где id =?"

В противном случае он обновляется до других статусов.

Ответ 13

Мое решение состоит в том, чтобы использовать инструкцию UPDATE с предложением RETURNING.

Users

-----------------------------------
ID        | Name       |      flags
-----------------------------------
1         |  bob       |        0  
2         |  fred      |        1  
3         |  tom       |        0   
4         |  ed        |        0   

Вместо SELECT .. FOR UPDATE используйте

BEGIN; 

UPDATE "Users"
SET ...
WHERE ...;
RETURNING ( column list );

COMMIT;

Поскольку оператор UPDATE получает блокировку ROW EXCLUSIVE в таблице, ее обновление вы получаете сериализованные обновления. Чтения по-прежнему разрешены, но они видят только данные перед началом транзакции UPDATE.

Ссылка: Concurrency Контроль Глава Pg docs.

Ответ 14

Используется в многопоточном и кластере?
Как насчет этого?

START TRANSACTION;

// All thread retrive same task list
// If result count is very big, using cursor 
//    or callback interface provied by ORM frameworks.
var ids = SELECT id FROM tableName WHERE k1=v1;

// Each thread get an unlocked recored to process.
for ( id in ids ) {
   var rec = SELECT ... FROM tableName WHERE id =#id# FOR UPDATE NOWAIT;
   if ( rec != null ) {
    ... // do something
   }
}

COMMIT;