Подтвердить что ты не робот

Использование таблицы базы данных в виде очереди

Я хочу использовать таблицу базы данных в качестве очереди. Я хочу вставить в него и взять элементы из него в вставленном порядке (FIFO). Мое главное соображение - производительность, потому что у меня есть тысячи этих транзакций каждую секунду. Поэтому я хочу использовать SQL-запрос, который дает мне первый элемент без поиска всей таблицы. Я не удаляю строку, когда я ее читаю. Помогает ли SELECT TOP 1.....? Должен ли я использовать какие-либо специальные индексы?

4b9b3361

Ответ 1

Я бы использовал поле IDENTITY в качестве первичного ключа для предоставления уникально увеличивающегося идентификатора для каждого поставленного в очередь элемента и привязал к нему кластерный индекс. Это будет представлять порядок, в котором элементы были поставлены в очередь.

Чтобы сохранить элементы в таблице очередей во время их обработки, вам понадобится поле "статус", указывающее текущее состояние определенного элемента (например, 0 = ожидание, 1 = обрабатываемый, 2 = обработанный). Это необходимо, чтобы предотвратить обработку элемента дважды.

При обработке элементов в очереди вам нужно будет найти следующий элемент в таблице, которая не обрабатывается в настоящее время. Это должно быть сделано таким образом, чтобы не допустить одновременного процесса обработки одного и того же элемента, как показано ниже. Обратите внимание на подсказки таблицы UPDLOCK и READPAST, о которых вы должны знать при реализации очередей.

например. внутри sproc, что-то вроде этого:

DECLARE @NextID INTEGER

BEGIN TRANSACTION

-- Find the next queued item that is waiting to be processed
SELECT TOP 1 @NextID = ID
FROM MyQueueTable WITH (UPDLOCK, READPAST)
WHERE StateField = 0
ORDER BY ID ASC

-- if we've found one, mark it as being processed
IF @NextId IS NOT NULL
    UPDATE MyQueueTable SET Status = 1 WHERE ID = @NextId

COMMIT TRANSACTION

-- If we've got an item from the queue, return to whatever is going to process it
IF @NextId IS NOT NULL
    SELECT * FROM MyQueueTable WHERE ID = @NextID

Если обработка элемента не удалась, вы хотите попробовать позже? Если это так, вам нужно либо reset вернуть статус 0 или что-то еще. Это потребует больше размышлений.

В качестве альтернативы, не используйте таблицу базы данных в качестве очереди, но что-то вроде MSMQ - просто подумайте, что я выброшу это в микс!

Ответ 2

Если вы не удалите обработанные строки, вам понадобится какой-то флаг, который указывает, что строка уже обработана.

Поместите указатель на этот флаг и в колонку, которую вы собираетесь заказать.

Разделите таблицу над этим флагом, так что транзакции с отменой транзакций не засоряют ваши запросы.

Если бы вы действительно получали сообщения 1.000 каждую секунду, это привело бы к 86.400.000 rows в день. Возможно, вам захочется подумать о том, как очистить старые строки.

Ответ 3

Все зависит от вашего механизма/реализации базы данных.

Для меня простые очереди на таблицах со следующими столбцами:

id / task / priority / date_added

обычно работает.

Я использовал приоритет и задачу для группировки задач, и в случае удвоенной задачи я выбрал вариант с более высоким приоритетом.

И не волнуйтесь - для современных баз данных "тысячи" ничего особенного.

Ответ 4

возможно добавление LIMIT = 1 в ваш оператор select поможет... заставлять возвращение после одного совпадения...

Ответ 5

Создайте кластерный индекс за столбец даты (или автоинкремента). Это будет содержать строки в таблице примерно в порядке индекса и разрешать быстрый доступ на основе индексов, если вы ORDER BY индексированный столбец. Использование TOP X (или LIMIT X, в зависимости от вашего RDMBS) будет извлекать только первые х элементов из индекса.

Предупреждение о производительности: вы всегда должны просматривать планы выполнения своих запросов (на реальных данных), чтобы убедиться, что оптимизатор не делает неожиданных вещей. Также попытайтесь сравнить ваши запросы (опять-таки по реальным данным), чтобы принимать обоснованные решения.

Ответ 6

Это не будет проблемой, если вы используете что-то, чтобы отслеживать дату и время вставки. См. Здесь параметры mysql. Вопрос в том, нужен ли вам только абсолютный последний отправленный элемент или вам нужно итерации. Если вам нужно итерации, то вам нужно захватить кусок с помощью оператора ORDER BY, пропустить цикл и запомнить последнее время datetime, чтобы вы могли использовать его, когда вы захватываете свой следующий фрагмент.

Ответ 7

Поскольку вы не удаляете записи из таблицы, вам нужно иметь составной индекс на (processed, id), где processed - это столбец, который указывает, была ли обработана текущая запись.

Лучше всего создать секционированную таблицу для ваших записей и сделать поле processed ключевым словом. Таким образом, вы можете сохранить три или более локальных индекса.

Однако, если вы всегда обрабатываете записи в порядке id и имеете только два состояния, обновление записи означало бы просто запись записи из первого листа индекса и добавление его к последнему листу

В текущей обработанной записи всегда будет наименьшее значение id всех необработанных записей и наибольшая id всех обработанных записей.

Ответ 8

Очень простое решение для этого, чтобы не иметь транзакций, блокировок и т.д., использовать механизмы отслеживания изменений (а не сбор данных). Он использует управление версиями для каждой добавленной/обновленной/удаленной строки, чтобы вы могли отслеживать, какие изменения произошли после конкретной версии.

Итак, вы сохраняете последнюю версию и запрашиваете новые изменения.

Если запрос выходит из строя, вы всегда можете вернуться назад и запросить данные из последней версии. Кроме того, если вы не хотите получать все изменения с помощью одного запроса, вы можете получить верхний порядок по последней версии и сохранить самую большую версию, которую я бы запросил.

См. это, например Использование отслеживания изменений в SQL Server 2008

Ответ 9

У меня был тот же самый общий вопрос "как превратить стол в очередь", и я не мог найти ответ, который хотел где-либо.

Вот что я придумал для Node/SQLite/better-sqlite3. В основном просто измените внутренние WHERE и ORDER BY для вашего варианта использования.

module.exports.pickBatchInstructions = (db, batchSize) => {
  const buf = crypto.randomBytes(8); // Create a unique batch identifier

  const q_pickBatch = '
    UPDATE
      instructions
    SET
      status = '${status.INSTRUCTION_INPROGRESS}',  
      run_id = '${buf.toString("hex")}',
      mdate = datetime(datetime(), 'localtime')
    WHERE
      id IN (SELECT id 
        FROM instructions 
        WHERE 
          status is not '${status.INSTRUCTION_COMPLETE}'
          and run_id is null
        ORDER BY
          length(targetpath), id
        LIMIT ${batchSize});
  ';
  db.run(q_pickBatch); // Change the status and set the run id

  const q_getInstructions = '
    SELECT
      *
    FROM
      instructions
    WHERE
      run_id = '${buf.toString("hex")}'
  ';
  const rows = db.all(q_getInstructions); // Get all rows with this batch id

  return rows;
};