Подтвердить что ты не робот

Каков наилучший способ реализации таблицы очереди сообщений в mysql

Вероятно, в десятый раз я реализую что-то подобное, и я никогда не был на 100% доволен решениями, которые я придумал.

Причина использования таблицы mysql вместо "правильной" системы обмена сообщениями привлекательна прежде всего тем, что большинство приложений уже используют некоторую реляционную базу данных для других вещей (которая, как правило, является mysql для большей части того, что я делал), в то время как очень мало приложений используют систему обмена сообщениями. Кроме того, реляционные базы данных имеют очень сильные свойства ACID, в то время как системы обмена сообщениями часто не работают.

Первой идеей является использование:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

И затем enqueue выглядит так:

insert into jobs(message) values('blah blah');

И dequeue выглядит так:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Таблица и enqueue выглядят красиво, но dequeue kinda меня беспокоит. Насколько вероятно откат? Или заблокировать? Какие ключи я должен использовать, чтобы сделать это O (1) -ish?

Или есть ли лучшее решение, что я делаю?

4b9b3361

Ответ 1

Я построил несколько систем очередей сообщений, и я не уверен, в каком типе сообщений вы обращаетесь, но в случае dequeuing (это слово?) Я сделал то же самое, что и вы Сделано. Ваш метод выглядит простым, чистым и прочным. Не то, чтобы моя работа была лучшей, но она оказалась очень эффективной для большого мониторинга для многих сайтов. (регистрация ошибок, массовые маркетинговые кампании по электронной почте, уведомления о социальных сетях)

Мое голосование: не беспокойтесь!

Ответ 2

Ваш детектив может быть более кратким. Вместо того, чтобы полагаться на откат транзакции, вы можете сделать это в одном атомарном заявлении без явной транзакции:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Затем вы можете выполнять задания с помощью (скобки [] означают необязательные, в зависимости от ваших данных):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

Ответ 3

Брайан Акер недавно говорил о очереди двигателя. Также обсуждался синтаксис SELECT table FROM DELETE.

Если вас не беспокоит пропускная способность, вы всегда можете использовать SELECT GET_LOCK() в качестве мьютекса. Например:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

И если вы хотите получить действительно фантазию, оберните ее в хранимую процедуру.

Ответ 4

Я бы предложил использовать Quartz.NET

У этого есть поставщики для SQL Server, Oracle, MySql, SQLite и Firebird.

Ответ 5

Этот поток содержит информацию о дизайне, которая должна отображаться.

Цитата:

Вот то, что я успешно использовал в прошлом:

Схема таблицы MsgQueue

MsgId identity - NOT NULL
MsgTypeCode varchar (20) - NOT NULL
SourceCode varchar (20) - процесс вставки сообщения - NULLable
State char (1) - 'N'ew if queued,' A '(ctive), если обрабатывается,' C'полно, по умолчанию 'N' - NOT NULL
CreateTime datetime - default GETDATE() - NOT NULL
Msg varchar (255) - NULLable

Ваши типы сообщений - это то, что вы ожидаете, - сообщения, которые соответствуют контракту между вложением процесса (процессами) и процессом (процессами) чтения, структурированными с помощью XML или другим выбором представления (JSON будет удобен в некоторых случаи, например).

Затем могут быть вставлены процессы 0-to-n, и процессы 0-to-n могут считывать и обрабатывать сообщения. Каждый процесс чтения обычно обрабатывает один тип сообщения. Для балансировки нагрузки может выполняться несколько экземпляров типа процесса.

Читатель извлекает одно сообщение и изменяет состояние на "A" ctive, пока он работает на нем. Когда это будет сделано, он изменит состояние на "C". Он может удалить сообщение или нет в зависимости от того, хотите ли вы сохранить контрольный журнал. Сообщения состояния = 'N' вытягиваются в порядке MsgType/Timestamp, поэтому есть индекс в MsgType + State + CreateTime.

Варианты:
Состояние для "E" rror.
Колонка для кода процесса чтения.
Временные метки для переходов состояний.

Это обеспечило приятный, масштабируемый, видимый и простой механизм для выполнения ряда вещей, подобных описанию. Если у вас есть базовое понимание баз данных, оно довольно надежное и расширяемое. Никогда не было проблем с откатами блокировок и т.д. Из-за транзакций перехода атомарного состояния.

Ответ 6

Вот решение, которое я использовал, работая без process_id текущего потока или блокируя таблицу.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Получите результат в массиве $row и выполните:

DELETE from jobs WHERE ID=$row['ID'];

Затем получите затронутые строки (mysql_affected_rows). Если есть затронутые строки, обработайте задание в массиве $row. Если есть 0 затронутых строк, это означает, что другой процесс уже обрабатывает выбранное задание. Повторите описанные выше шаги, пока не будет строк.

Я тестировал это с таблицей "jobs", имеющей 100k строк, и создавал 20 параллельных процессов, которые делают выше. Никаких условий гонки не произошло. Вы можете изменить приведенные выше запросы, чтобы обновить строку с помощью флага обработки и удалить строку после того, как вы ее обработали:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Излишне говорить, что для такой работы вы должны использовать надлежащую очередь сообщений (ActiveMQ, RabbitMQ и т.д.). Мы должны были прибегнуть к этому решению, хотя, поскольку наш хост регулярно ломает вещи при обновлении программного обеспечения, так что чем меньше материала, тем лучше.

Ответ 7

Вы можете иметь промежуточную таблицу для поддержания смещения для очереди.

create table scan(
  scan_id int primary key,
  offset_id int
);

У вас также может быть несколько сканирований, следовательно, одно смещение на сканирование. Инициализируйте offset_id = 0 в начале сканирования.

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

Все, что вам нужно сделать, это просто сохранить последнее смещение. Это также сэкономит вам много места (process_id для каждой записи). Надеюсь, это звучит логично.