Подтвердить что ты не робот

Dapper Bulk Insert Возвращает серийные идентификаторы

Я пытаюсь выполнить массивную вставку, используя Dapper над Npgsql, которая возвращает идентификаторы вновь вставленных строк. Следующая инструкция insert используется в обоих моих примерах:

var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";

Сначала я попытался добавить массив объектов со свойством "Значение":

var values = new[] {
    new { Value = 0.0 },
    new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);

Однако это не работает с NpgsqlException: "ERROR: 42703: значение столбца" не существует ". Прочитав этот вопрос, я подумал, что, возможно, мне нужно передать объект DataTable вместо массива объектов:

var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);

Однако это не удается с тем же самым исключением. Как выполнить массивную вставку и получить результирующие последовательные идентификаторы из Dapper над Npgsql?

Я заметил, что оболочка исключения не соответствует имени столбца, но я уверен, что у меня есть кавычки вокруг имен таблиц и столбцов, поэтому я не уверен, почему он говорит "значение" вместо "Значение" "в исключении. Просто подумал, что я бы упомянул об этом на случай, если он каким-то образом связан с ошибкой, так как легко заметить корпус.

- EDIT -

Чтобы уточнить, это SQL, чтобы создать таблицу

CREATE TABLE "MyTable" (
    "ID" SERIAL PRIMARY KEY,
    "Value" DOUBLE PRECISION NOT NULL
);

И используя переменные "запрос" и "значения", определенные выше, это код, который работает для каждой строки:

var ids = new List<int>();
foreach (var valueObj in values) {
    var queryParams = new DynamicParamaters();
    queryParams.Add("Value", valueObj.Value);
    ids.AddRange(connection.Query<int>(query, queryParams));
}

Проблема в том, что мне нужно иметь возможность вставлять сотни (возможно, тысячи в ближайшем будущем) строк в секунду в "MyTable", поэтому ожидание этого цикла для итеративного отправления каждого значения в базу данных является громоздким и (I предположим, но еще не достигли контрольных показателей). Кроме того, я выполняю дополнительные вычисления для значений, которые могут или не могут привести к дополнительным вставкам, где мне нужна ссылка внешнего ключа на запись "MyTable".

Из-за этих проблем я ищу альтернативу, которая отправляет все значения в один оператор в базу данных, чтобы уменьшить сетевой трафик и задержку обработки. Опять же, я НЕ тестировал итеративный подход еще... то, что я ищу, является альтернативой, которая делает объемную вставку, поэтому я могу сравнить два подхода друг к другу.

4b9b3361

Ответ 1

В конечном итоге я предложил четыре разных подхода к этой проблеме. Я сгенерировал 500 случайных значений для вставки в MyTable и рассчитал время для каждого из четырех подходов (включая запуск и откат транзакции, в которой она была запущена). В моем тесте база данных расположена на localhost. Тем не менее, для решения с наилучшей производительностью также требуется всего одна поездка на сервер базы данных, поэтому лучшее решение, которое я нашел, должно по-прежнему превосходить альтернативы при развертывании на сервере, отличном от базы данных.

Обратите внимание, что переменные connection и transaction используются в следующем коде и считаются действительными объектами данных Npgsql. Также обратите внимание, что обозначение Nx медленнее указывает на то, что операция занимает количество времени, равное оптимальному решению, умноженному на N.

Подход № 1 (1 494 мс = 18,7х медленнее): развернуть массив в отдельные параметры

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES ");

    // Create the dictionary used to store the query parameters
    var queryParams = new DynamicParameters();

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add a unique parameter for each id
    var paramIdx = 0;
    foreach (var entry in result)
    {
        var paramName = string.Format("value{1:D6}", paramIdx);
        if (0 < paramIdx++) query.Append(',');
        query.AppendFormat("(:{0})", paramName);
        queryParams.Add(paramName, entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, queryParams, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

Я действительно не уверен, почему это оказалось самым медленным, так как для этого требуется всего лишь одно обращение к базе данных, но это было так.

Подход № 2 (267 мс = 3,3 раза медленнее): стандартная итерация цикла

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        entry.ID = connection.Query<int>(
            query, queryParams, transaction);
    }

    // Return the result
    return result;
}

Я был шокирован, что это было всего лишь в 3,3 раза медленнее, чем оптимальное решение, но я ожидал, что в реальной среде это значительно ухудшится, поскольку это решение требует последовательной отправки на сервер 500 сообщений. Однако это также самое простое решение.

Подход № 3 (223 мс = 2,8 раза медленнее): асинхронная итерация цикла

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database asynchronously
    var taskList = new List<Task<IEnumerable<int>>>();
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        taskList.Add(connection.QueryAsync<int>(
            query, queryParams, transaction));
    }

    // Now that all queries have been sent, start reading the results
    for (var i = 0; i < result.Count; ++i)
    {
        result[i].ID = taskList[i].Result.First();
    }

    // Return the result
    return result;
}

Это становится лучше, но все еще менее чем оптимально, потому что мы можем поставить в очередь столько вставок, сколько имеется доступных потоков в пуле потоков. Однако это почти так же просто, как и многопоточный подход, поэтому это хороший компромисс между скоростью и удобочитаемостью.

Подход № 4 (134 мс = 1,7 раза медленнее): массовые вставки

Этот подход требует, чтобы следующий Postgres SQL был определен до запуска сегмента кода под ним:

CREATE TYPE "MyTableType" AS (
    "Value" DOUBLE PRECISION
);

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
            'VALUES ($1) RETURNING "ID"';
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY EXECUTE insertCmd USING entry."Value";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

И связанный код:

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

У меня есть две проблемы с этим подходом. Во-первых, я должен жестко закодировать порядок элементов MyTableType. Если этот порядок изменится, я должен изменить этот код, чтобы он соответствовал. Во-вторых, мне нужно преобразовать все входные значения в строку перед отправкой их в postgres (в реальном коде у меня более одного столбца, поэтому я не могу просто изменить сигнатуру функции базы данных, чтобы получить двойную precision [], если я не передам N массивов, где N - количество полей в MyTableType).

Несмотря на эти подводные камни, это становится все ближе к идеалу и требует только одного обращения к базе данных.

- НАЧАТЬ РЕДАКТИРОВАТЬ -

Начиная с первоначального поста, я придумал четыре дополнительных подхода, которые работают быстрее, чем перечисленные выше. Я изменил Nx более медленные числа, чтобы отразить новый самый быстрый метод, ниже.

Подход № 5 (105 мс = в 1,3 раза медленнее): такой же, как № 4, без динамического запроса

Единственное различие между этим подходом и подходом № 4 заключается в следующем изменении функции "InsertIntoMyTable":

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY INSERT INTO "MyTable" ("Value")
                VALUES (entry."Value") RETURNING "ID";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

Помимо проблем, связанных с подходом № 4, недостатком этого является то, что в производственной среде раздел MyTable разделен. Используя этот подход, мне нужен один метод для целевого раздела.

Подход № 6 (89 мс = 1,1x медленнее): вставить оператор с аргументом массива

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
            "UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

Единственным недостатком этого является то же самое, что и первая проблема с подходом № 4. А именно, это "MyTableType" реализацию с упорядочением "MyTableType". Тем не менее я обнаружил, что это мой второй любимый подход, поскольку он очень быстрый и не требует каких-либо функций базы данных для правильной работы.

Подход № 7 (80 мс = ОЧЕНЬ немного медленнее): То же, что № 1, но без параметров

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES");

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each row directly into the insert statement
    for (var i = 0; i < result.Count; ++i)
    {
        entry = result[i];
        query.Append(i == 0 ? ' ' : ',');
        query.AppendFormat("({0:E16})", entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, null, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

Это мой любимый подход. Это только немного медленнее, чем самый быстрый (даже с 4000 записей, он все еще работает менее 1 секунды), но не требует специальных функций или типов базы данных. Единственное, что мне не нравится в этом, - это то, что я должен упорядочить значения двойной точности, только чтобы Postgres снова проанализировал их. Было бы предпочтительнее отправлять значения в двоичном виде, чтобы они занимали 8 байтов вместо огромных 20 байтов, которые я выделил для них.

Подход № 8 (80 мс): такой же, как № 5, но в чистом sql

Единственная разница между этим подходом и подходом № 5 заключается в следующем изменении функции "InsertIntoMyTable":

CREATE FUNCTION "InsertIntoMyTable"(
    entries "MyTableType"[]) RETURNS SETOF INT AS $$

    INSERT INTO "MyTable" ("Value")
        SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;

Этот подход, как и № 5, требует одной функции на раздел "MyTable". Это самый быстрый способ, поскольку план запроса может быть сгенерирован один раз для каждой функции, а затем использован повторно. В других подходах запрос должен быть проанализирован, затем спланирован, а затем выполнен. Несмотря на то, что он был самым быстрым, я не выбрал его из-за дополнительных требований к базе данных по сравнению с подходом № 7, с очень небольшим выигрышем в скорости.