Подтвердить что ты не робот

Лучший способ ограничить количество активных задач, запущенных через параллельную библиотеку задач

Рассмотрим очередь с лотом заданий, требующих обработки. Ограничение очереди может только получить 1 задание за раз и никак не знать, сколько рабочих мест есть. Работы занимают 10 секунд для завершения и привлечения большого количества ожиданий от ответов от веб-служб, поэтому это не связано с ЦП.

Если я использую что-то вроде этого

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Затем он будет яростно пополнять задания из очереди намного быстрее, чем может их завершить, исчерпать память и упасть на задницу.. > & Л;

Я не могу использовать (я не думаю) ParallelOptions.MaxDegreeOfParallelism, потому что я не могу использовать Parallel.Invoke или Parallel.ForEach

3 альтернативы, которые я нашел

  • Замените Task.Factory.StartNew на

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    Что, кажется, несколько решает проблему, но я не ясно, что это делает, и если это лучший метод.

  • Создайте настраиваемый планировщик задач который ограничивает степень concurrency

  • Используйте что-то вроде BlockingCollection, чтобы добавлять задания в коллекцию при запуске и удалять, когда закончите, чтобы ограничить число, которое может быть запущено.

С# 1 мне нужно верить, что правильное решение автоматически принято, # 2/# 3 Мне нужно решить максимальное количество задач, которые могут выполняться сами.

Я понял это правильно - что лучше, или есть другой способ?

РЕДАКТИРОВАТЬ. Это то, что я получил из приведенных ниже ответов, шаблон производителя-потребителя.

Как и общая пропускная способность, задача заключалась не в том, чтобы выполнить деактивацию заданий быстрее, чем можно было обработать, и не иметь очередь опроса нескольких потоков (не показано здесь, но это неблокирующий op и приведет к огромным трансакционным издержкам, если опросить на высокой частоте с несколько мест).

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
4b9b3361

Ответ 1

Я только что дал ответ, который очень применим к этому вопросу.

В принципе, класс задач TPL предназначен для планирования работы с ЦП. Он не предназначен для блокировки работы.

Вы работаете с ресурсом, который не является ЦП: ожидание ответов на службы. Это означает, что TPL будет неправильно использовать ваш ресурс, поскольку в определенной степени предполагает ограниченность ЦП.

Управлять ресурсами самостоятельно: Начать фиксированное количество потоков или задач LongRunning (что в основном одинаково). Определите количество потоков эмпирически.

Вы не можете создавать ненадежные системы. По этой причине я рекомендую # 1, но дросселирую. Не создавайте столько потоков, сколько есть рабочих элементов. Создайте столько потоков, которые необходимы для насыщения удаленной службы. Напишите себе вспомогательную функцию, которая генерирует N потоков и использует их для обработки M рабочих элементов. Таким образом, вы получаете вполне предсказуемые и надежные результаты.

Ответ 2

Потенциальные расщепления потока и продолжения, вызванные await, позже в вашем коде или в сторонней библиотеке, не будут хорошо воспроизводиться с длинными работами (или потоками), поэтому не беспокойтесь о том, чтобы использовать долго выполняемые задачи. В мире async/await они бесполезны. Подробнее здесь.

Вы можете позвонить ThreadPool.SetMaxThreads, но прежде чем выполнять этот вызов, убедитесь, что вы установили минимальное количество потоков с помощью ThreadPool.SetMinThreads, используя значения ниже или равные максимальным. И, кстати, документация MSDN неверна. Вы можете пойти ниже числа ядер на вашем компьютере с этими вызовами методов, по крайней мере, в .NET 4.5 и 4.6, где я использовал этот метод для уменьшения вычислительной мощности 32-разрядной службы с ограниченным объемом памяти.

Если, однако, вы не хотите ограничивать все приложение, а только часть его обработки, это будет выполнять пользовательский планировщик задач. Давным-давно MS выпустила samples с несколькими настраиваемыми планировщиками задач, включая LimitedConcurrencyLevelTaskScheduler. Задайте основную задачу обработки вручную с помощью Task.Factory.StartNew, предоставляя персонализированный планировщик задач, и каждая другая задача, порожденная им, будет использовать ее, включая async/await и даже Task.Yield, используемые для достижения асинхронности на ранней стадии в методе async.

Но для вашего конкретного случая оба решения не перестанут изматывать вашу очередь заданий, прежде чем завершать их. Это может быть нежелательно, в зависимости от реализации и назначения вашей очереди. Они больше похожи на "пожар кучу задач и позволяют планировщику найти время для их выполнения". Поэтому, возможно, что-то более подходящее здесь может быть более строгим методом контроля над выполнением заданий с помощью semaphores. Код будет выглядеть так:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

Там более одного способа скинуть кошку. Используйте то, что, по вашему мнению, уместно.

Ответ 3

Проблема здесь, похоже, не слишком много запусков Task s, слишком много запланированных Task s. Ваш код будет пытаться планировать как можно больше Task, независимо от того, насколько быстро они выполняются. И если у вас слишком много заданий, это означает, что вы получите OOM.

Из-за этого ни одно из предлагаемых вами решений не решит вашу проблему. Если кажется, что простое определение LongRunning решает вашу проблему, то, скорее всего, потому, что для создания нового Thread (что и есть LongRunning) требуется некоторое время, которое эффективно дросселирует получение новых заданий. Таким образом, это решение работает только случайно, и, скорее всего, это приведет к другим проблемам позже.

Что касается решения, я в основном согласен с usr: простейшее решение, которое работает достаточно хорошо, - это создать фиксированное количество задач LongRunning и иметь один цикл, который вызывает Queue.PopJob() (защищен lock, если этот метод не является потокобезопасным) и Execute() задание.

ОБНОВЛЕНИЕ: После нескольких размышлений я понял, что следующая попытка, скорее всего, будет вести себя ужасно. Используйте его, только если вы действительно уверены, что он будет хорошо работать для вас.


Но TPL пытается выяснить лучшую степень parallelism, даже для IO-bound Task s. Таким образом, вы можете попытаться использовать это в своих интересах. Long Task здесь не будет работать, потому что с точки зрения TPL кажется, что никакой работы не сделано, и она начнет новый Task снова и снова. Вместо этого вы можете начать новый Task в конце каждого Task. Таким образом, TPL будет знать, что происходит, и его алгоритм может работать хорошо. Кроме того, чтобы TPL определил степень parallelism, в начале Task, которая первой в своей строке, запустите еще одну строку Task s.

Этот алгоритм может работать хорошо. Но также возможно, что TPL сделает плохое решение относительно степени parallelism, я на самом деле ничего подобного не пробовал.

В коде это будет выглядеть так:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

И начните с

Task.Factory.StartNew(() => ProcessJobs(true));

Ответ 4

В Microsoft есть очень классная библиотека под названием DataFlow, которая делает именно то, что вы хотите (и многое другое). Подробнее здесь.

Вы должны использовать класс ActionBlock и установить MaxDegreeOfParallelism объекта ExecutionDataflowBlockOptions. ActionBlock отлично работает с async/await, поэтому даже когда ваши внешние вызовы ждут, никакие новые задания не начнут обрабатываться.

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)

Ответ 5

TaskCreationOptions.LongRunning полезен для блокировки задач, и использование здесь здесь является законным. Что он делает, так это предлагает планировщику посвятить поток задаче. Сам планировщик пытается сохранить количество потоков на одном уровне с количеством ядер ЦП, чтобы избежать чрезмерного переключения контекста.

Это хорошо описано в Threading in С# by Joseph Albahari

Ответ 6

Я использую механизм очереди сообщений/почтового ящика для достижения этого. Это похоже на актерскую модель. У меня есть класс, который имеет MailBox. Я называю этот класс своим "рабочим". Он может получать сообщения. Эти сообщения помещаются в очередь, и они, по сути, определяют задачи, которые я хочу, чтобы рабочий выполнял. Работник будет использовать Task.Wait() для завершения своей задачи до удаления очередного сообщения и начала следующей задачи.

Ограничивая число рабочих, которых у меня есть, я могу ограничить количество одновременных потоков/задач, которые выполняются.

Это изложено, с исходным кодом, в моем сообщении в блоге о распределенном вычислительном движке. Если вы посмотрите на код для IActor и WorkerNode, я надеюсь, что это имеет смысл.

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/