Я хочу использовать таблицу базы данных в качестве очереди. Я хочу вставить в него и взять элементы из него в вставленном порядке (FIFO). Мое главное соображение - производительность, потому что у меня есть тысячи этих транзакций каждую секунду. Поэтому я хочу использовать SQL-запрос, который дает мне первый элемент без поиска всей таблицы. Я не удаляю строку, когда я ее читаю. Помогает ли SELECT TOP 1.....? Должен ли я использовать какие-либо специальные индексы?
Использование таблицы базы данных в виде очереди
Ответ 1
Я бы использовал поле IDENTITY в качестве первичного ключа для предоставления уникально увеличивающегося идентификатора для каждого поставленного в очередь элемента и привязал к нему кластерный индекс. Это будет представлять порядок, в котором элементы были поставлены в очередь.
Чтобы сохранить элементы в таблице очередей во время их обработки, вам понадобится поле "статус", указывающее текущее состояние определенного элемента (например, 0 = ожидание, 1 = обрабатываемый, 2 = обработанный). Это необходимо, чтобы предотвратить обработку элемента дважды.
При обработке элементов в очереди вам нужно будет найти следующий элемент в таблице, которая не обрабатывается в настоящее время. Это должно быть сделано таким образом, чтобы не допустить одновременного процесса обработки одного и того же элемента, как показано ниже. Обратите внимание на подсказки таблицы UPDLOCK и READPAST, о которых вы должны знать при реализации очередей.
например. внутри sproc, что-то вроде этого:
DECLARE @NextID INTEGER
BEGIN TRANSACTION
-- Find the next queued item that is waiting to be processed
SELECT TOP 1 @NextID = ID
FROM MyQueueTable WITH (UPDLOCK, READPAST)
WHERE StateField = 0
ORDER BY ID ASC
-- if we've found one, mark it as being processed
IF @NextId IS NOT NULL
UPDATE MyQueueTable SET Status = 1 WHERE ID = @NextId
COMMIT TRANSACTION
-- If we've got an item from the queue, return to whatever is going to process it
IF @NextId IS NOT NULL
SELECT * FROM MyQueueTable WHERE ID = @NextID
Если обработка элемента не удалась, вы хотите попробовать позже? Если это так, вам нужно либо reset вернуть статус 0 или что-то еще. Это потребует больше размышлений.
В качестве альтернативы, не используйте таблицу базы данных в качестве очереди, но что-то вроде MSMQ - просто подумайте, что я выброшу это в микс!
Ответ 2
Если вы не удалите обработанные строки, вам понадобится какой-то флаг, который указывает, что строка уже обработана.
Поместите указатель на этот флаг и в колонку, которую вы собираетесь заказать.
Разделите таблицу над этим флагом, так что транзакции с отменой транзакций не засоряют ваши запросы.
Если бы вы действительно получали сообщения 1.000
каждую секунду, это привело бы к 86.400.000
rows в день. Возможно, вам захочется подумать о том, как очистить старые строки.
Ответ 3
Все зависит от вашего механизма/реализации базы данных.
Для меня простые очереди на таблицах со следующими столбцами:
id / task / priority / date_added
обычно работает.
Я использовал приоритет и задачу для группировки задач, и в случае удвоенной задачи я выбрал вариант с более высоким приоритетом.
И не волнуйтесь - для современных баз данных "тысячи" ничего особенного.
Ответ 4
возможно добавление LIMIT = 1 в ваш оператор select поможет... заставлять возвращение после одного совпадения...
Ответ 5
Создайте кластерный индекс за столбец даты (или автоинкремента). Это будет содержать строки в таблице примерно в порядке индекса и разрешать быстрый доступ на основе индексов, если вы ORDER BY
индексированный столбец. Использование TOP X
(или LIMIT X
, в зависимости от вашего RDMBS) будет извлекать только первые х элементов из индекса.
Предупреждение о производительности: вы всегда должны просматривать планы выполнения своих запросов (на реальных данных), чтобы убедиться, что оптимизатор не делает неожиданных вещей. Также попытайтесь сравнить ваши запросы (опять-таки по реальным данным), чтобы принимать обоснованные решения.
Ответ 6
Это не будет проблемой, если вы используете что-то, чтобы отслеживать дату и время вставки. См. Здесь параметры mysql. Вопрос в том, нужен ли вам только абсолютный последний отправленный элемент или вам нужно итерации. Если вам нужно итерации, то вам нужно захватить кусок с помощью оператора ORDER BY
, пропустить цикл и запомнить последнее время datetime, чтобы вы могли использовать его, когда вы захватываете свой следующий фрагмент.
Ответ 7
Поскольку вы не удаляете записи из таблицы, вам нужно иметь составной индекс на (processed, id)
, где processed
- это столбец, который указывает, была ли обработана текущая запись.
Лучше всего создать секционированную таблицу для ваших записей и сделать поле processed
ключевым словом. Таким образом, вы можете сохранить три или более локальных индекса.
Однако, если вы всегда обрабатываете записи в порядке id
и имеете только два состояния, обновление записи означало бы просто запись записи из первого листа индекса и добавление его к последнему листу
В текущей обработанной записи всегда будет наименьшее значение id
всех необработанных записей и наибольшая id
всех обработанных записей.
Ответ 8
Очень простое решение для этого, чтобы не иметь транзакций, блокировок и т.д., использовать механизмы отслеживания изменений (а не сбор данных). Он использует управление версиями для каждой добавленной/обновленной/удаленной строки, чтобы вы могли отслеживать, какие изменения произошли после конкретной версии.
Итак, вы сохраняете последнюю версию и запрашиваете новые изменения.
Если запрос выходит из строя, вы всегда можете вернуться назад и запросить данные из последней версии. Кроме того, если вы не хотите получать все изменения с помощью одного запроса, вы можете получить верхний порядок по последней версии и сохранить самую большую версию, которую я бы запросил.
См. это, например Использование отслеживания изменений в SQL Server 2008
Ответ 9
У меня был тот же самый общий вопрос "как превратить стол в очередь", и я не мог найти ответ, который хотел где-либо.
Вот что я придумал для Node/SQLite/better-sqlite3. В основном просто измените внутренние WHERE
и ORDER BY
для вашего варианта использования.
module.exports.pickBatchInstructions = (db, batchSize) => {
const buf = crypto.randomBytes(8); // Create a unique batch identifier
const q_pickBatch = '
UPDATE
instructions
SET
status = '${status.INSTRUCTION_INPROGRESS}',
run_id = '${buf.toString("hex")}',
mdate = datetime(datetime(), 'localtime')
WHERE
id IN (SELECT id
FROM instructions
WHERE
status is not '${status.INSTRUCTION_COMPLETE}'
and run_id is null
ORDER BY
length(targetpath), id
LIMIT ${batchSize});
';
db.run(q_pickBatch); // Change the status and set the run id
const q_getInstructions = '
SELECT
*
FROM
instructions
WHERE
run_id = '${buf.toString("hex")}'
';
const rows = db.all(q_getInstructions); // Get all rows with this batch id
return rows;
};