Подтвердить что ты не робот

Как обрабатывать сообщения MSMQ параллельно

Я пишу службу Windows для использования сообщений MSMQ. Служба будет иметь периоды высокой активности (80 тыс. Сообщений поступают очень быстро) и длительные периоды бездействия (может быть несколько дней без нового сообщения).

Обработка сообщений очень привязана к сети, поэтому я получаю большую выгоду из parallelism. Но в периоды бездействия я не хочу связывать цепочку потоков, ожидающих сообщений, которые не придут в ближайшее время.

Интерфейс MSMQ, похоже, очень сфокусирован на синхронном рабочем процессе - получает одно сообщение, обрабатывает его, получает другое и т.д. Как мне структурировать свой код, чтобы я мог использовать parallelism в периоды высокой активности, но не связывать кучу нитей в периоды отсутствия активности? Бонусные баллы за использование TPL. Псевдокод будет оценен.

4b9b3361

Ответ 1

На протяжении многих лет я выделил MSMQ (включая мобильные реализации), и вы правильно указали "синхронный рабочий процесс". Это не значит, что вы не можете принимать различные конверты сообщений и обрабатывать их через разные ядра через TPL... ограничивающим фактором является чтение/запись в очередь... по сути это последовательная операция. Вы не можете отправить 8 сообщений одновременно (например, компьютер с 8 ядрами).

У меня была аналогичная потребность (без использования пространства имен System.Messaging) и решила ее с некоторой помощью из книги, которую я прочитал "Параллельное программирование с помощью Microsoft.NET" Кэмпбелла и Джонсона.

Ознакомьтесь с главой "параллельные задачи" и, в частности, частью глобальной очереди, которая взаимодействует с локальными очередями для каждого потока для обработки работы (т.е. TPL), которые используют алгоритм "кражи работы" для выполнения балансировки нагрузки. Я смоделировал свое решение, частично, после их примера. Окончательная версия моей системы имела огромную разницу в ее производительности (от 23 сообщений в секунду до более 200).

В зависимости от того, сколько времени займет ваша система от 0 до 80 000, вы захотите использовать тот же дизайн и распространить его на несколько серверов (каждый с несколькими процессорами и несколькими ядрами). Теоретически для моей настройки потребуется немного меньше 7 минут, чтобы отполировать все 80K, поэтому, добавив второй компьютер, он сократит это примерно до ~ 3 минут и 20 секунд и т.д. И т.д. И т.д. Трюк - это логическая обработка воровства.

Пища для размышлений...

Быстрое редактирование: BTW компьютер - это рабочая станция Dell T7500 с двухъядерным четырехъядерным процессором Xeons @3GHz, 24 ГБ оперативной памяти, 64-разрядная версия Windows 7 Ultimate.

Ответ 2

Вот упрощенная версия того, что я сделал:

while(true) {
    int msgCount = 0;

    Parallel.ForEach(Enumerable.Range(0,20), (i) => {
        MessageQueue queue = new MessageQueue(_queuePath);

        try {
            msg = queue.Receive(TimeSpan.Zero);
            // do work

            Interlocked.Increment(ref msgCount);
        catch(MessageQueueException mqex) {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
                return; // nothing in queue
            }
            else throw;
        }           
    }

    if (msgCount < 20) 
        Thread.Sleep(1000); // nothing more to do, take a break
}

Поэтому я стараюсь получать сообщения 20 за раз, считая те, которые я получаю. Для тех 20, я позволил TPL пойти в город. В конце, если я обработал менее 20 сообщений, очередь пуста, и я сплю поток в течение секунды, прежде чем повторять попытку.

Ответ 3

NServiceBus имеет отличную концепцию для этой самой проблемы. Он назывался Distributor. Идея заключается в том, что дистрибьютор может перенаправить выполняемую работу и распределить ее по любому количеству запущенных дочерних узлов. В зависимости от типа выполняемой работы, например, тяжелые вычисления и записи на диски, вы можете распространять их на несколько процессов или даже на несколько машин.

Ответ 4

Решение также частично зависит от того, как обрабатываются сообщения.

Я использовал WorkflowService, размещенный в Windows Server AppFabric с привязкой Net.Msmq и транзакционной очередью. Транзакционная привязка net.msmq была необходима для обработки обработки сообщения не в порядке. Рабочий процесс - это конечный автомат .Net 4.0.1, и сообщения поступают в одну очередь из разных систем. Возможно, например, чтобы одна система отправила обновление экземпляру конечного автомата, прежде чем другая система отправила сообщение для его создания. Чтобы разрешить обработку сообщения не в порядке, хост службы рабочего процесса использует BufferedReceive для блокировки сообщений и неоднократно повторяет попытку получения их из блокировки. BufferedReceive имеет максимальные ожидающие сообщения, установленные на максимальную вероятную длину партии, потому что сообщения в заблокированной очереди возвращаются в очередь повторов в начале.

У WF также есть несколько настроек дросселирования. Моя максимальная длина пакета составляет около 20000. Я установил MaxConcurrentCalls равным 64, а MaxConcurrentInstances - 20000. Это приводит к тому, что IIS/WAS обрабатывает 64 одновременных вызова.

Однако, это и есть то, потому что Receives в рабочем процессе односторонне, это не означает, что порожденные WF-процессы завершаются, как только Receive завершает. Что произойдет в следующем сценарии, так это то, что после того, как сообщение было отменено, и вызвал экземпляр WF, который является одним из 64 вызовов, ряд рабочих шагов запланирован механизмом рабочего процесса, и одна из них - операция базы данных.

В результате получается, что 64 вызова могут быть максимальными, но если скорость потока сообщений выше, чем скорость завершения асинхронного процесса, по мере обработки входящей партии сообщений будет большее количество выполняемых потоков (в мой случай WF экземпляров). Это может привести к непредвиденным событиям, например, пул соединений ADO.NET по умолчанию равен 100 в качестве максимального количества подключений. Это вызовет переполнение процессов в ожидании соединений из исчерпанного пула. Для этой конкретной задачи вы можете либо повысить значение MaxPoolSize, либо использовать Service Broker для асинхронного взаимодействия с операциями db (что означает большую сложность рабочего процесса).

Надеюсь, это поможет кому-то.

Ответ 5

Вот как я это делаю (меняя код на лету, чтобы могли быть орфографические ошибки):

for (int i = 0; i < numberOfSimultaneousRequests; i++)
            priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);

и обратный вызов выглядит примерно так:

private void ProcessMessage(IAsyncResult asyncResult)
    {
        try
        {
            Message msg = priorityQueue.EndReceive(asyncResult);
            //Do something with the message
        }
        finally
        {
            priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one
        }

Ответ 6

Просто как-то похоже, tpl, похоже, может использовать какое-то исключение безопасности потока, где он сталкивается с физическими проблемами, например, попробуйте создать sqlconnection за пределами tpl foreach и использовать его внутри тела цикла - он выбрал исключение для меня. Я новичок в очереди, прежде чем войти в тело, перечислив список строк и, казалось, все в порядке, мой код обрабатывал 10000 элементов последовательно менее 500 мс, используя 1way-обмен сообщениями на i7 2500 8gb и локальном msmq