Подтвердить что ты не робот

Как вставить данные parellel в три разные таблицы

У меня есть хранимая процедура, которая будет вставлять большую часть записей, теперь есть ли возможность вставлять данные в 3 таблицы параллельно;

  • Первая таблица вставляет 1 миллион записей.
  • Вторая таблица содержит 1,5 миллиона записей.
  • Третья таблица вставки записей 500k

В соответствии с моим знанием - процедура вставки происходит одна за другой.

Итак, как я могу реализовать загрузку параллельно?

4b9b3361

Ответ 1

Заявления выполняются синхронно в партии T-SQL. Чтобы выполнять несколько операторов асинхронно и параллельно из хранимой процедуры, вам необходимо использовать несколько одновременных подключений к базе данных. Обратите внимание, что сложная часть с асинхронным исполнением определяет не только, когда все задачи завершены, но также и то, были ли они успешными или неудачными.

Метод 1: пакет SSIS

Создайте пакет SSIS для одновременного выполнения трех SQL-операторов. В SQL 2012 и более поздних версиях запустите пакет с помощью хранимых процедур каталога SSIS. В пред-SQL 2012 вам необходимо создать задание агента SQL для пакета и запустить с помощью sp_start_job.

Вам нужно будет проверить статус выполнения SSIS или статус задания агента SQL, чтобы определить завершение и результат успеха/сбоя.

Метод 2: Powershell и агент SQL

Выполните задание агента SQL, которое запускает Powershell script, который выполняет запросы параллельно с использованием фоновых заданий Powershell (команда Start-Job). script может возвращать код выхода, нуль для успеха и ненулевой для отказа, так что агент SQL может определить, удалось ли это выполнить. Проверьте статус задания агента SQL, чтобы определить завершение и результат успеха/отказа.

Метод 3: задания нескольких операторов SQL

Выполнять несколько заданий агента SQL одновременно, каждый с шагом задания T-SQL, содержащим импорт script. Проверьте статус задания агента SQL для каждого задания, чтобы определить завершение и результат успеха/отказа.

Метод 4: Сервисный брокер Используйте активированную очередь proc для параллельного выполнения сценариев импорта. Это может быть тупым, если вы раньше не использовали Service Broker, и важно следить за проверенными шаблонами. Я включил пример, чтобы вы начали (замените THROW на RAISERROR для пред-SQL 2012). В базе данных должен быть включен Service Broker, который включен по умолчанию, но отключен после восстановления или присоединения.

USE YourDatabase;
Go

--create proc that will be automatically executed (activated) when requests are waiting
CREATE PROC dbo.ExecuteTSqlTask
AS
SET NOCOUNT ON;

DECLARE
      @TSqlJobConversationHandle uniqueidentifier = NEWID()
    , @TSqlExecutionRequestMessage xml
    , @TSqlExecutionResultMessage xml
    , @TSqlExecutionResult varchar(10)
    , @TSqlExecutionResultDetails nvarchar(MAX)
    , @TSqlScript nvarchar(MAX)
    , @TSqlTaskName sysname
    , @RowsAffected int
    , @message_type_name sysname;

WHILE 1 = 1
BEGIN

    --get the next task to execute
    WAITFOR (
        RECEIVE TOP (1)
              @TSqlJobConversationHandle = conversation_handle
            , @TSqlExecutionRequestMessage = CAST(message_body AS xml)
            , @message_type_name = message_type_name
        FROM dbo.TSqlExecutionQueue
        ), TIMEOUT 1000;

    IF @@ROWCOUNT = 0
    BEGIN
        --no work to do - exit
        BREAK;
    END;

    IF @message_type_name = N'TSqlExecutionRequest'
    BEGIN

        --get task name and script
        SELECT
              @TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname')
            , @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)');

        --execute script
        BEGIN TRY
            EXEC sp_executesql @TSqlScript;
            SET @RowsAffected = @@ROWCOUNT;
            SET @TSqlExecutionResult = 'Completed';
            SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected';
        END TRY
        BEGIN CATCH
            SET @TSqlExecutionResult = 'Erred';
            SET @TSqlExecutionResultDetails = 
                  'Msg ' + CAST(ERROR_NUMBER() AS varchar(10))
                + ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2))
                + ', State ' + CAST(ERROR_STATE() AS varchar(10))
                + ', Line ' + CAST(ERROR_LINE() AS varchar(10))
                + ': ' + ERROR_MESSAGE();
        END CATCH;

        --send execution result back to initiator
        SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />';
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] ');
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] ');
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] ');
        SEND ON CONVERSATION @TSqlJobConversationHandle
            MESSAGE TYPE TSqlExecutionResult
            (@TSqlExecutionResultMessage);

    END
    ELSE
    BEGIN
        IF @message_type_name = N'TSqlJobComplete'
        BEGIN
            --service has ended conversation so we're not going to get any more execution requests
            END CONVERSATION @TSqlJobConversationHandle;
        END
        ELSE
        BEGIN
            END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask';
            RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name);
        END;
    END;
END;
GO

CREATE QUEUE dbo.TSqlResultQueue;
CREATE QUEUE dbo.TSqlExecutionQueue
    WITH STATUS=ON,
    ACTIVATION (
          STATUS = ON
        , PROCEDURE_NAME = dbo.ExecuteTSqlTask
        , MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances 
        , EXECUTE AS OWNER
        );
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT TSqlExecutionContract (
      TSqlExecutionRequest SENT BY INITIATOR
    , TSqlJobComplete SENT BY INITIATOR
    , TSqlExecutionResult SENT BY TARGET
    );
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]);
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]);
GO

CREATE PROC dbo.ExecuteParallelImportScripts
AS
SET NOCOUNT ON;

DECLARE
      @TSqlJobConversationHandle uniqueidentifier
    , @TSqlExecutionRequestMessage xml
    , @TSqlExecutionResultMessage xml
    , @TSqlExecutionResult varchar(10)
    , @TSqlExecutionResultDetails nvarchar(MAX)
    , @TSqlTaskName sysname
    , @CompletedCount int = 0
    , @ErredCount int = 0
    , @message_type_name sysname;

DECLARE @TsqlTask TABLE(
      TSqlTaskName sysname NOT NULL PRIMARY KEY 
    , TSqlScript nvarchar(MAX) NOT NULL
    );

BEGIN TRY

    --insert a row for each import task
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;');
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;');
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;');

    --start a conversation for this import process
    BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle
        FROM SERVICE TSqlJobService
        TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE'
        ON CONTRACT TSqlExecutionContract
        WITH ENCRYPTION = OFF;

    --send import tasks to executor service for parallel execution
    DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR
        SELECT (SELECT TSqlTaskName, TSqlScript
            FROM @TsqlTask AS task 
            WHERE task.TSqlTaskName = job.TSqlTaskName
            FOR XML PATH(''), TYPE) AS TSqlExecutionRequest
        FROM @TsqlTask AS job;
    OPEN JobTasks;
    WHILE 1 = 1
    BEGIN
        FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage;
        IF @@FETCH_STATUS = -1 BREAK;
        SEND ON CONVERSATION @TSqlJobConversationHandle
            MESSAGE TYPE TSqlExecutionRequest
            (@TSqlExecutionRequestMessage);
    END;
    CLOSE JobTasks;
    DEALLOCATE JobTasks;

    --get each parallel task execution result until all are complete
    WHILE 1 = 1
    BEGIN

        --get next task result
        WAITFOR (
            RECEIVE TOP (1)
                  @TSqlExecutionResultMessage = CAST(message_body AS xml)
                , @message_type_name = message_type_name
            FROM dbo.TSqlResultQueue
            WHERE conversation_handle = @TSqlJobConversationHandle
            ), TIMEOUT 1000;

        IF @@ROWCOUNT <> 0
        BEGIN

            IF @message_type_name = N'TSqlExecutionResult'
            BEGIN

                --get result of import script execution
                SELECT
                      @TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname')
                    , @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)')
                    , @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N'');
                RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT;
                IF @TSqlExecutionResult = 'Completed'
                BEGIN
                    SET @CompletedCount += 1;
                END
                ELSE
                BEGIN
                    SET @ErredCount += 1;
                END;

                --remove task from tracking table after completion
                DELETE FROM @TSqlTask
                WHERE TSqlTaskName = @TSqlTaskName;

                IF NOT EXISTS(SELECT 1 FROM @TsqlTask)
                BEGIN
                    --all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation
                    SEND ON CONVERSATION @TSqlJobConversationHandle
                        MESSAGE TYPE TSqlJobComplete;
                END
            END
            ELSE
            BEGIN
                IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
                BEGIN
                    --executor service has ended conversation so we're done
                    END CONVERSATION @TSqlJobConversationHandle;
                    BREAK;
                END
                ELSE
                BEGIN
                    END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts';
                    RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name);
                END;
            END
        END;
    END;
    RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount);
END TRY
BEGIN CATCH
    THROW;
END CATCH;
GO

--execute import scripts in parallel
EXEC dbo.ExecuteParallelImportScripts;
GO

Ответ 2

Вы можете попробовать создать три задания и выполнить скрипты вставки параллельно, как показано ниже:

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job1', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into First Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for first table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job2', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into second Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for second table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job3', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into Third Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for third table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

EXEC msdb.dbo.sp_start_job N'Job1' ; --All will execute in parallel
EXEC msdb.dbo.sp_start_job N'Job2' ;
EXEC msdb.dbo.sp_start_job N'Job3' ;

Ответ 3

Предполагая, что вы хотите иметь одинаковое значение даты вставки для всех вставок, определите параметр даты, установленный на текущую дату, как показано.

DECLARE @InsertDate as date
SET @InsertDate = GetDate()

Затем передайте параметр даты вставки в вашу хранимую процедуру вставки и обновите эту хранимую процедуру, чтобы использовать этот ввод. Это гарантирует, что одно и то же значение даты вставки будет использоваться для всех вставок.

EXEC dbo.InsertTables123 @p1 = @InsertDate

Параметр ввода @InsertDate также может быть назначен вручную, если требуется другое, чем текущая дата.

Ответ 4

Для вашей процедуры я предполагаю, что у вас есть имя таблицы и местоположение файла в качестве параметров.

Если у вас большой файл с 3 миллионами записей, вам нужно разбить файл на 3 небольших файла (если вы знаете какой-либо другой язык, отличный от sql), после этого вы можете открыть 3 консоли командной строки Sql, и вызовите процедуру в каждой консоли. Это сделает параллельную установку. Или вы знаете какие-либо другие языки программирования, вы можете использовать несколько потоков для вызова процедуры.

Ответ 5

Все три таблицы идентичны по структуре и контенту? Если это так, используйте репликацию транзакций/слияния

В качестве альтернативы, создайте триггер для первой таблицы, чтобы вставить во вторую и третью таблицу