Подтвердить что ты не робот

Force Oracle вернет TOP N строк с SKIP LOCKED

Есть несколько questions о том, как реализовать таблицу в виде очереди (конкретная блокировка строки, выбирая определенное количество из них и пропуская в настоящее время заблокированные строки) в Oracle и SQL Server.

Как я могу гарантировать, что я получаю строки с определенным номером (N), считая, что допустимо не менее N строк?

Из того, что я видел, Oracle применяет предикат WHERE, прежде чем определять, какие строки пропускать. Это означает, что если я хочу вытащить одну строку из таблицы, а два потока одновременно выполняют один и тот же SQL, один получит строку, а другой - пустой набор результатов (даже если есть более подходящие строки).

Это противоречит тому, как SQL Server обрабатывает подсказки блокировки UPDLOCK, ROWLOCK и READPAST. В SQL Server TOP, как представляется, волшебным образом ограничивает количество записей после успешного достижения блокировок.

Обратите внимание, две интересные статьи здесь и здесь.

ORACLE

CREATE TABLE QueueTest (
    ID NUMBER(10) NOT NULL,
    Locked NUMBER(1) NULL,
    Priority NUMBER(10) NOT NULL
);

ALTER TABLE QueueTest ADD CONSTRAINT PK_QueueTest PRIMARY KEY (ID);

CREATE INDEX IX_QueuePriority ON QueueTest(Priority);

INSERT INTO QueueTest (ID, Locked, Priority) VALUES (1, NULL, 4);
INSERT INTO QueueTest (ID, Locked, Priority) VALUES (2, NULL, 3);
INSERT INTO QueueTest (ID, Locked, Priority) VALUES (3, NULL, 2);
INSERT INTO QueueTest (ID, Locked, Priority) VALUES (4, NULL, 1);

В двух отдельных сеансах выполните:

SELECT qt.ID
FROM QueueTest qt
WHERE qt.ID IN (
    SELECT ID
    FROM
        (SELECT ID FROM QueueTest WHERE Locked IS NULL ORDER BY Priority)
    WHERE ROWNUM = 1)
FOR UPDATE SKIP LOCKED

Обратите внимание, что первый возвращает строку, а второй сеанс не возвращает строку:

Сессия 1

 ID
----
  4

Сессия 2

 ID
----

СЕРВЕР SQL

CREATE TABLE QueueTest (
    ID INT IDENTITY NOT NULL,
    Locked TINYINT NULL,
    Priority INT NOT NULL
);

ALTER TABLE QueueTest ADD CONSTRAINT PK_QueueTest PRIMARY KEY NONCLUSTERED (ID);

CREATE INDEX IX_QueuePriority ON QueueTest(Priority);

INSERT INTO QueueTest (Locked, Priority) VALUES (NULL, 4);
INSERT INTO QueueTest (Locked, Priority) VALUES (NULL, 3);
INSERT INTO QueueTest (Locked, Priority) VALUES (NULL, 2);
INSERT INTO QueueTest (Locked, Priority) VALUES (NULL, 1);

В двух отдельных сеансах выполните:

BEGIN TRANSACTION
SELECT TOP 1 qt.ID
FROM QueueTest qt
WITH (UPDLOCK, ROWLOCK, READPAST)
WHERE Locked IS NULL
ORDER BY Priority;

Обратите внимание, что оба сеанса возвращают другую строку.

Сессия 1

 ID
----
  4

Сессия 2

 ID
----
  3

Как я могу получить подобное поведение в Oracle?

4b9b3361

Ответ 1

"Из того, что я видел, Oracle применяет предикат WHERE, прежде чем определять, какие строки пропускать".

Угу. Это единственный возможный способ. Вы не можете пропустить строку из набора результатов, пока не определите набор результатов.

Ответ просто не ограничивает количество строк, возвращаемых оператором SELECT. Вы все равно можете использовать подсказки FIRST_ROWS_n, чтобы направить оптимизатор, чтобы вы не хватали полный набор данных.

Программное обеспечение, вызывающее SELECT, должно выбирать только первые n строк. В PL/SQL это будет

DECLARE
  CURSOR c_1 IS  
    SELECT /*+FIRST_ROWS_1*/ qt.ID
    FROM QueueTest qt
    WHERE Locked IS NULL
    ORDER BY PRIORITY
    FOR UPDATE SKIP LOCKED;
BEGIN
  OPEN c_1;
  FETCH c_1 into ....
  IF c_1%FOUND THEN
     ...
  END IF;
  CLOSE c_1;
END;

Ответ 2

Решение, выпущенное Gary Meyers, - это все, о чем я могу думать, кроме использования AQ, который делает все это для вас и многое другое.

Если вы действительно хотите избежать PLSQL, вы сможете перевести PLSQL в Java JDBC-вызовы. Все, что вам нужно сделать, - подготовить тот же оператор SQL, выполнить его, а затем продолжать делать выборки по одной строке (или выборки по N-строкам).

Документация Oracle на http://download.oracle.com/docs/cd/B10501_01/java.920/a96654/resltset.htm#1023642 дает некоторое представление о том, как это сделать на уровне инструкции:

Чтобы установить размер выборки для запроса, вызовите setFetchSize() для объекта оператора до выполнения запроса. Если вы установили размер выборки в N, то с каждой поездкой в ​​базу данных будут загружены N строк.

Итак, вы могли бы что-то программировать на Java, что выглядит примерно так (в псевдокоде):

stmt = Prepare('SELECT /*+FIRST_ROWS_1*/ qt.ID
FROM QueueTest qt
WHERE Locked IS NULL
ORDER BY PRIORITY
FOR UPDATE SKIP LOCKED');

stmt.setFetchSize(10);
stmt.execute();

batch := stmt.fetch();
foreach row in batch {
  -- process row
}
commit (to free the locks from the update)
stmt.close;

UPDATE

На основании приведенных ниже комментариев было предложено использовать ROWNUM для ограничения полученных результатов, но в этом случае это не сработает. Рассмотрим пример:

create table lock_test (c1 integer);

begin
  for i in 1..10 loop
    insert into lock_test values (11 - i);
  end loop;
  commit;
end;
/

Теперь у нас есть таблица с 10 строками. Обратите внимание, что я аккуратно вставил строки в обратном порядке, сначала строка содержит 10, затем 9 и т.д.

Скажите, что вам нужны первые 5 строк, упорядоченные по возрастанию - т.е. от 1 до 5. Ваша первая попытка:

select *
from lock_test
where rownum <= 5
order by c1 asc;

Что дает результаты:

C1
--
6
7
8
9 
10

Это явно неправильно, и это ошибка, которую делают почти все! Посмотрите план объяснения запроса:


| Id  | Operation           | Name      | Rows  | Bytes | Cost (%CPU)| Time     |
---------------------------------------------------------------------------------
|   0 | SELECT STATEMENT    |           |     5 |    65 |     4  (25)| 00:00:01 |
|   1 |  SORT ORDER BY      |           |     5 |    65 |     4  (25)| 00:00:01 |
|*  2 |   COUNT STOPKEY     |           |       |       |            |          |
|   3 |    TABLE ACCESS FULL| LOCK_TEST |    10 |   130 |     3   (0)| 00:00:01 |
---------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   2 - filter(ROWNUM<=5)

Oracle выполняет план снизу вверх - обратите внимание, что фильтр на rownum выполняется до сортировки, Oracle берет строки в том порядке, в котором они находятся (порядок они были вставлены здесь {10, 9, 8, 7, 6}), останавливается после того, как получает 5 строк, а затем сортирует их.

Итак, чтобы получить правильные первые 5, вам нужно сначала выполнить сортировку, а затем порядок, используя встроенное представление:

select * from
(
  select *
  from lock_test
  order by c1 asc
)
where rownum <= 5;

C1
--
1
2
3
4
5

Теперь, чтобы, наконец, дойти до сути - можете ли вы поместить пропущенное обновление в нужное место?

select * from
(
  select *
  from lock_test
  order by c1 asc
)
where rownum <= 5
for update skip locked;

Это дает ошибку:

ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc

Попытка переместить обновление для просмотра в представлении дает синтаксическую ошибку:

select * from
(
  select *
  from lock_test
  order by c1 asc
  for update skip locked
)
where rownum <= 5;

Единственное, что будет работать, это следующее: ДАЕТ НЕПРАВИЛЬНЫЙ РЕЗУЛЬТАТ:

  select *
  from lock_test
  where rownum <= 5
  order by c1 asc
  for update skip locked;

Infact, если вы запустите этот запрос в сеансе 1, а затем запустите его снова во втором сеансе, второй сеанс даст нулевые строки, что действительно действительно неправильно!

Так что вы можете сделать? Откройте курсор и выберите количество строк, которые вы хотите от него:

set serveroutput on

declare
  v_row lock_test%rowtype;
  cursor c_lock_test
  is
  select c1
  from lock_test
  order by c1
  for update skip locked;
begin
  open c_lock_test;
  fetch c_lock_test into v_row;
  dbms_output.put_line(v_row.c1);
  close c_lock_test;
end;
/    

Если вы запустите этот блок в сеансе 1, он распечатает "1", поскольку он закроет первую строку. Затем запустите его снова в сеансе 2, и он напечатает "2", когда он пропустил строку 1 и получил следующий свободный.

Этот пример находится в PLSQL, но с использованием setFetchSize в Java вы должны иметь возможность получить то же самое поведение.

Ответ 3

В первом сеансе при выполнении:

SELECT qt.ID
FROM QueueTest qt
WHERE qt.ID IN (
    SELECT ID
    FROM
        (SELECT ID FROM QueueTest WHERE Locked IS NULL ORDER BY Priority)
    WHERE ROWNUM = 1)
FOR UPDATE SKIP LOCKED

Ваша внутренняя попытка выбора захватить только id = 4 и заблокировать ее. Это успешно, потому что эта единственная строка еще не заблокирована.

Во втором сеансе ваш внутренний выбор STILL пытается захватить ТОЛЬКО id = 4 и заблокировать его. Это не будет успешным, поскольку первая строка все еще заблокирована с помощью первого сеанса.

Теперь, если вы обновили "заблокированное" поле в первом сеансе, следующий сеанс для запуска этого выбора будет захватывать id = 3.

В основном, в вашем примере вы зависите от флага, который не установлен. Чтобы использовать заблокированный флаг, вы, вероятно, хотите сделать что-то вроде:

  • выберите идентификаторы, которые вы хотите, на основе некоторых критериев.
  • Немедленное обновление заблокированного флага = 1 для этих идентификаторов (если ресурс занят, другой сеанс избил вас до этого шага для 1 или более идентификаторов, goto 1 снова)
  • Сделайте что-нибудь на этих идентификаторах
  • обновить флаг блокировки до нуля

Затем вы можете использовать команду select for update skip locked, поскольку ваш заблокированный флаг поддерживается.

Лично мне не нравятся все обновления флагов (ваше решение может потребовать их по любой причине), поэтому я бы просто попытался выбрать идентификаторы, которые я хочу обновить (по любым критериям) в каждом сеансе:

выберите * из queuetest, где... для обновления пропущена блокировка;

Например (на самом деле, мои критерии не будут основаны на списке идентификаторов, но таблица queuetest слишком упрощена):

  • sess 1: выберите * from queuetest, где id в (4,3) для блокировки обновления:

  • sess 2: выберите * из queuetest, где id в (4,3,2) для блокировки обновления;

Здесь sess1 блокирует 4,3, а sess2 будет блокировать только 2.

Вы не можете, насколько мне известно, сделать top-n или использовать group_by/order_by и т.д. в инструкции select for update, вы получите ORA-02014.

Ответ 4

Мое решение - написать хранимую процедуру следующим образом:

CREATE OR REPLACE FUNCTION selectQueue 
RETURN SYS_REFCURSOR
AS
  st_cursor SYS_REFCURSOR;
  rt_cursor SYS_REFCURSOR;
  i number(19, 0);

BEGIN

  open st_cursor for
  select id
  from my_queue_table
  for update skip locked;

  fetch st_cursor into i;
  close st_cursor;

  open rt_cursor for select i as id from dual;
  return  rt_cursor;

 END;

Это простой пример - возвращение TOP FIRST не заблокированной строки. Чтобы получить TOP N строк - замените одиночную выборку на локальную переменную ( "i" ) на выборку цикла в таблицу temp.

PS: возвращающий курсор - для спящего режима дружбы.

Ответ 5

Я встретил эту проблему, мы тратим много времени на ее решение. Некоторое использование for update for update skip locked, в oracle 12c новый метод заключается в использовании fetch first n rows only. Но мы используем оракула 11g.

Наконец, мы попробовали этот метод, и нашли хорошо работает.

CURSOR c_1 IS  
   SELECT *
     FROM QueueTest qt
     WHERE Locked IS NULL
     ORDER BY PRIORITY;
   myRow c_1%rowtype;
   i number(5):=0;
   returnNum := 10;
BEGIN
  OPEN c_1;
  loop 
    FETCH c_1 into myRow 
    exit when c_1%notFOUND 
    exit when i>=returnNum;
    update QueueTest set Locked='myLock' where id=myrow.id and locked is null;
    i := i + sql%rowcount;
  END
  CLOSE c_1;
  commit;
END;

Я пишу это в блокноте, так что что-то может быть не так, вы можете изменить это как процедуру или как-то еще.

Ответ 6

Во-первых, спасибо за ответы из топ-2. Изучите много из них. Я проверил следующий код и после запуска основного метода Practicedontdel.java обнаружил, что эти два класса каждый раз выводят разные строки. Пожалуйста, дайте мне знать, если в любом случае этот код может потерпеть неудачу. (PS: благодаря переполнению стека)

Practicedontdel.java:

    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs =null;
    String val="";
    int count =0;

        conn = getOracleConnection();
        conn.setAutoCommit(false);
        ps = prepareStatement(conn,"SELECT /*+FIRST_ROWS_3*/ t.* from 
        REPROCESS_QUEUE t FOR UPDATE SKIP LOCKED");
        ps.setFetchSize(3);
        boolean rss = ps.execute();
        rs = ps.getResultSet();
        new Practisethread().start();
        while(count<3 && rs.next())
        {
            val = rs.getString(1);
            System.out.println(val);
            count++;
            Thread.sleep(10000);
        }

Practisethread.java: в run():

            conn = getOracleConnection();
            conn.setAutoCommit(false);
            ps = prepareStatement(conn,"SELECT /*+FIRST_ROWS_3*/ t.* from REPROCESS_QUEUE t FOR UPDATE SKIP LOCKED");
            ps.setFetchSize(3);
            boolean rss = ps.execute();
            rs = ps.getResultSet();
            while(count<3 && rs.next())
            {
                val = rs.getString(1);
                System.out.println("******thread******");
                System.out.println(val);
                count++;
                Thread.sleep(5000);
            }
            conn.commit();
            System.out.println("end of thread program");