Рассмотрим очередь с лотом заданий, требующих обработки. Ограничение очереди может только получить 1 задание за раз и никак не знать, сколько рабочих мест есть. Работы занимают 10 секунд для завершения и привлечения большого количества ожиданий от ответов от веб-служб, поэтому это не связано с ЦП.
Если я использую что-то вроде этого
while (true)
{
var job = Queue.PopJob();
if (job == null)
break;
Task.Factory.StartNew(job.Execute);
}
Затем он будет яростно пополнять задания из очереди намного быстрее, чем может их завершить, исчерпать память и упасть на задницу.. > & Л;
Я не могу использовать (я не думаю) ParallelOptions.MaxDegreeOfParallelism, потому что я не могу использовать Parallel.Invoke или Parallel.ForEach
3 альтернативы, которые я нашел
-
Замените Task.Factory.StartNew на
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start();
Что, кажется, несколько решает проблему, но я не ясно, что это делает, и если это лучший метод.
-
Создайте настраиваемый планировщик задач который ограничивает степень concurrency
-
Используйте что-то вроде BlockingCollection, чтобы добавлять задания в коллекцию при запуске и удалять, когда закончите, чтобы ограничить число, которое может быть запущено.
С# 1 мне нужно верить, что правильное решение автоматически принято, # 2/# 3 Мне нужно решить максимальное количество задач, которые могут выполняться сами.
Я понял это правильно - что лучше, или есть другой способ?
РЕДАКТИРОВАТЬ. Это то, что я получил из приведенных ниже ответов, шаблон производителя-потребителя.
Как и общая пропускная способность, задача заключалась не в том, чтобы выполнить деактивацию заданий быстрее, чем можно было обработать, и не иметь очередь опроса нескольких потоков (не показано здесь, но это неблокирующий op и приведет к огромным трансакционным издержкам, если опросить на высокой частоте с несколько мест).
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);
// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
Thread consumer = new Thread(() =>
{
while (!jobs.IsCompleted)
{
var job = jobs.Take();
job.Execute();
}
}
consumer.Start();
}
// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
var job = Queue.PopJob();
if (job != null)
jobs.Add(job);
else
{
jobs.CompletedAdding()
// May need to wait for running jobs to finish
break;
}
}