Подтвердить что ты не робот

Трубы и фильтры на уровне СУБД: разделение выходного потока MERGE

Сценарий

У нас есть довольно стандартный процесс импорта данных, в котором мы загружаем staging, затем MERGE в таблицу target.

Новые требования (зеленый) включают захват подмножества импортированных данных в отдельную таблицу queue для полностью несвязанной обработки.

Схема сценария

"Задача"

(1) Подмножество состоит из набора записей: те, которые были только что вставлен в таблицу target.

(2) Подмножество является проекцией некоторых вставленных столбцов, но также по крайней мере, один столбец, который присутствует только в источнике (staging таблицу).

(3) Оператор MERGE уже использует предложение OUTPUT..INTO строго записать $action, взятый на MERGE, чтобы мы могли PIVOT результат и COUNT количество вставок, обновлений и исключений для целей статистики. Нам действительно не нравится буферизация действия для всего набора данных, подобные этому, и предпочли бы агрегировать суммы на лету. Излишне говорить, что мы не хотим добавлять больше данных для это таблица OUTPUT.

(4) Мы не хотим выполнять соответствующую работу, чтобы MERGE второй раз по какой-либо причине, даже частично. target таблица действительно большая, мы не можем индексировать все, и операция обычно довольно дорога (минуты, а не секунды).

(5) Мы не рассматриваем возможность округления любых результатов от MERGE до клиент, чтобы клиент мог перенаправить его на queue на немедленно отправив его обратно. Данные должны оставаться на сервере.

(6) Мы хотим избежать буферизации всего набора данных во временном хранилище между staging и queue.

Каким будет лучший способ этого?

Неудачи

(a) Требование о регистрации только вставленных записей предотвращает нас от ориентации таблицы queue непосредственно в предложении OUTPUT..INTO MERGE, поскольку это не допускает предложения WHERE. Мы можем использовать некоторые CASE обман, чтобы отметить нежелательные записи для последующего удаления из queue без обработки, но это кажется сумасшедшим.

(b) Поскольку некоторые столбцы, предназначенные для queue, не отображаются в target, мы не можем просто добавить триггер ввода в цель таблицу для загрузки queue. "Расщепление потока данных" должно произойти раньше.

(c) Поскольку мы уже использовали предложение OUTPUT..INTO в MERGE, мы не может добавить второе предложение OUTPUT и вставить MERGE в INSERT..SELECT, чтобы загрузить очередь. Это позор, потому что это чувствует себя как совершенно произвольное ограничение для того, что работает очень хорошо в противном случае; SELECT фильтрует только записи с $action мы хотим (INSERT) и INSERT их в queue в одном выражение. Таким образом, СУБД теоретически может избежать буферизации всего dataset и просто поместите его в queue. (Примечание: мы не преследовали и, вероятно, это фактически не оптимизировало план таким образом.)

Ситуация

Мы чувствуем, что исчерпали наши возможности, но решили обратиться к hivemind быть уверенным. Все, что мы можем придумать, это:

(S1) Создайте таблицу VIEW таблицы target, которая также содержит значение NULL столбцы для данных, предназначенных только для queue, и Оператор SELECT определяет их как NULL. Затем установите INSTEAD OF триггеры, которые заполняют таблицу target и queue соответственно. Наконец, подключите MERGE, чтобы настроить таргетинг на представление. Эта работает, но мы не поклонники конструкции - это определенно выглядит сложнее.

(S2) Откажитесь, буферизируйте весь набор данных во временной таблице, используя другой MERGE..OUTPUT. После MERGE немедленно скопируйте данные (снова!) из временной таблицы в queue.

4b9b3361

Ответ 1

Я понимаю, что основным препятствием является ограничение предложения OUTPUT в SQL Server. Он позволяет одному OUTPUT INTO table и/или одному OUTPUT возвращать результирующий набор для вызывающего.

Вы хотите сохранить результат оператора MERGE двумя способами:

  • все строки, на которые повлиял MERGE для сбора статистики
  • только вставленные строки для queue

Простой вариант

Я бы использовал ваше решение S2. По крайней мере, для начала. Его легко понять и поддерживать, и он должен быть достаточно эффективным, потому что самая ресурсоемкая операция (MERGE в Target сама будет выполняться только один раз). Ниже приведен второй вариант, и было бы интересно сравнить их производительность с реальными данными.

Итак:

  • Используйте OUTPUT INTO @TempTable в MERGE
  • Либо INSERT все строки из @TempTable в Stats или агрегат перед вставкой. Если вам нужна сводная статистика, имеет смысл объединить результаты этой партии и объединить ее в окончательный Stats вместо копирования всех строк.
  • INSERT в queue только "вставленные" строки из @TempTable.

Я возьму образцы данных из ответа @i-one.

Схема

-- I'll return to commented lines later

CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
);

CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

Примеры данных

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

Объединить

DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
    Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;

INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;

INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
SELECT
    T.foo
    ,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Результат

TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A   | AA  |
| B   | BB  |
| C   | CC  |
+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C   | CCC |
+-----+-----+

Второй вариант

Протестировано на SQL Server 2014 Express.

OUTPUT может отправлять свой результирующий набор в таблицу и вызывающую. Таким образом, OUTPUT INTO может напрямую войти в Stats, и если мы завершим оператор MERGE в хранимую процедуру, то мы можем использовать INSERT ... EXEC в queue.

Если вы рассмотрите план выполнения, вы увидите, что INSERT ... EXEC создает временную таблицу за кулисами (см. также Скрытые затраты INSERT EXEC на Adam Machanic), поэтому я ожидаю, что общая производительность будет похожа на первый вариант, когда вы создаете временную таблицу явно.

Еще одна проблема для решения: queue таблица должна иметь только "вставленные" строки, а не все строки. Для этого вы можете использовать триггер в таблице queue для удаления строк, отличных от "вставленных". Еще одна возможность состоит в том, чтобы определить уникальный индекс с помощью IGNORE_DUP_KEY = ON и подготовить данные таким образом, чтобы "не вставленные" строки нарушали уникальный индекс и не включались в таблицу.

Итак, я добавлю столбец ID IDENTITY в таблицу Target, и я добавлю столбец TargetID в таблицу queue. (Раскомментируйте их в script выше). Кроме того, я добавлю индекс в таблицу queue:

CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
    [TargetID] ASC
) WITH (
PAD_INDEX = OFF, 
STATISTICS_NORECOMPUTE = OFF, 
SORT_IN_TEMPDB = OFF, 
IGNORE_DUP_KEY = ON, 
DROP_EXISTING = OFF, 
ONLINE = OFF, 
ALLOW_ROW_LOCKS = ON, 
ALLOW_PAGE_LOCKS = ON)

Важная часть - UNIQUE и IGNORE_DUP_KEY = ON.

Вот хранимая процедура для MERGE:

CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
    inserted.foo,
    Src.baz
    ;

END

Использование

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
VALUES
    (0
    ,NULL
    ,NULL);

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Результат

TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
|  1 | A   | AA  |
|  2 | B   | BB  |
|  3 | C   | CC  |
+----+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+----------+------+------+
| TargetID | foo  | baz  |
+----------+------+------+
|        0 | NULL | NULL |
|        3 | C    | CCC  |
+----------+------+------+

Во время INSERT ... EXEC появится дополнительное сообщение:

Duplicate key was ignored.

если MERGE обновлено несколько строк. Это предупреждающее сообщение отправляется, когда уникальный индекс отбрасывает некоторые строки во время INSERT из-за IGNORE_DUP_KEY = ON.

При добавлении повторяющихся значений ключа появится предупреждающее сообщение в уникальный индекс. Только строки, нарушающие ограничение единственности не удастся.

Ответ 2

Рассмотрим два подхода к решению проблемы:

  • Объединить данные в целевые и выходные данные, вставленные в очередь в одном из операторов, и суммировать статистику в триггере, созданном на целевом сервере. Пакетный идентификатор может быть передан в триггер через временную таблицу.
  • Объединить данные в целевые и выходные данные, вставленные в очередь в одном операторе, и суммировать статистику сразу же после слияния, используя встроенные возможности отслеживания изменений, вместо того, чтобы делать это в триггере.

Подход 1 (объединить данные и собрать статистику в триггере):

Пример настройки данных (индексы и ограничения опущены для простоты):

create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);

insert into staging values
    ('A', 'AA', 'AAA')
    ,('B', 'BB', 'BBB')
    ,('C', 'CC', 'CCC')
    ;

insert into target values
    ('A', 'A_')
    ,('B', 'B?')
    ,('E', 'EE')
    ;

Триггер для сбора вставленной/обновленной/удаленной статистики:

create trigger target_onChange
on target
after delete, update, insert
as
begin
    set nocount on;

    if object_id('tempdb..#targetMergeBatch') is NULL
        return;

    declare @batchID int;
    select @batchID = batchID from #targetMergeBatch;

    merge into stats t
    using (
        select
            batchID = @batchID,
            cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
            cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
            cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
        from inserted i
            full join deleted d on d.foo = i.foo
    ) s
    on t.batchID = s.batchID
    when matched then
        update
        set
            t.inserted = t.inserted + s.cntIns,
            t.updated = t.updated + s.cntUpd,
            t.deleted = t.deleted + s.cntDel
    when not matched then
        insert (batchID, inserted, updated, deleted)
        values (s.batchID, s.cntIns, s.cntUpd, cntDel);

end

Операторы слияния:

declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;

create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);

insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

drop table #targetMergeBatch

Проверьте результаты:

select * from target;
select * from queue;
select * from stats;

Цель:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Queue

foo        baz
---------- ----------
C          CCC

Статистика:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

Подход 2 (собирайте статистику, используя возможности отслеживания изменений):

Настройка пробных данных такая же, как в предыдущем случае (просто отбросьте все, включая триггер и заново создайте таблицы с нуля), за исключением того, что в этом случае нам нужно иметь PK для цели, чтобы сделать работу образца:

create table target (foo varchar(10) primary key, bar varchar(10));

Включить отслеживание изменений в базе данных:

alter database Test
    set change_tracking = on

Включить отслеживание изменений в целевой таблице:

alter table target
    enable change_tracking

Слияние данных и статистика захвата сразу после этого, фильтрация по контексту изменения для подсчета только строк, затронутых слиянием:

begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();

with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

with ch(foo, op) as (
    select foo, sys_change_operation
    from changetable(changes target, @chVersion) ct
    where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
    pivot(count_big(foo) for op in ([I], [U], [D])) pvt
    ;

commit transaction;

Проверьте результаты:

select * from target;
select * from queue;
select * from stats;

Они такие же, как в предыдущем примере.

Цель:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Queue

foo        baz
---------- ----------
C          CCC

Статистика:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

Ответ 3

Я предлагаю извлечь статистику, используя кодирование с использованием трех независимых триггеров AFTER INSERT / DELETE / UPDATE по строкам:

create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go

create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go

create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go

Если вам нужно больше контекста, поместите что-то в CONTEXT_INFO и вырвите его из триггеров.

Теперь я буду утверждать, что триггеры AFTER не так дороги, но вам нужно проверить это, чтобы быть уверенным.

Имея дело с этим, вы можете использовать предложение OUTPUT ( НЕ OUTPUT INTO) в MERGE, а затем использовать его, вложенное в выборку для подмножества данных что вы хотите войти в таблицу queue.

Обоснование

Из-за необходимости доступа к столбцам из staging и target для создания данных для queue, этот HAS выполняется с помощью параметра OUTPUT в MERGE, поскольку ничто иное не имеет доступа к "обеим сторонам".

Затем, если мы убьем предложение OUTPUT для queue, как мы можем переделать эту функциональность? Я думаю, что триггеры AFTER будут работать, учитывая требования к указанной вами статистике. Действительно, статистика может быть довольно сложной, если требуется, с учетом доступных изображений. Я утверждаю, что триггеры AFTER "не так дороги", поскольку данные до и после должны всегда быть доступны, чтобы транзакция могла быть как COMMITTED ИЛИ ROLLED BACK - да, данные должны быть отсканированы (даже чтобы получить счет), но это не похоже на слишком большую стоимость.

В моем собственном анализе сканирование добавило около 5% к базовой стоимости плана выполнения

Звучит как решение?

Ответ 4

Рассматривали ли вы слияние слияния и просто вставляете, где не существует и обновление? Затем вы можете использовать предложение вывода из вставки для заполнения вашей таблицы очередей.

Ответ 5

Импорт через промежуточную таблицу может быть более эффективным с последовательной, а не ориентированной на заданную обработку. Я хотел бы переписать MERGE в хранимую процедуру с помощью сканирования курсора. Затем для каждой записи вы можете иметь столько выходов, сколько хотите, плюс любые значения без поворота при общей стоимости одного сканирования таблицы staging.

Сохраненная процедура может также предоставлять возможности для разделения обработки на более мелкие транзакции, тогда как триггеры на больших наборах данных могут привести к переполнению журнала транзакций.

Ответ 6

Если мне что-то не хватает, простая команда вставки должна соответствовать всем вашим требованиям.

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging join target on staging.foo = target.boo
where whatever

Это произойдет после слияния в цель.

Только для новых записей сделайте это до слияния

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging left join target on staging.foo = target.boo
where target.foo = null