Подтвердить что ты не робот

Обеспечение порядка выполнения задач в threadpool

Я читал о шаблоне пула потоков, и я не могу найти обычное решение для следующей проблемы.

Я иногда хочу, чтобы задачи выполнялись последовательно. Например, я читаю фрагменты текста из файла и по какой-то причине мне нужны куски, которые будут обрабатываться в этом порядке. Поэтому в основном я хочу исключить concurrency для некоторых задач.

Рассмотрим этот сценарий, в котором задачи с * должны обрабатываться в том порядке, в котором они были введены. Другие задачи могут обрабатываться в любом порядке.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

В контексте пула потоков, без этого ограничения, одна очередь ожидающих задач работает нормально, но явно здесь нет.

Я думал о том, что некоторые потоки работают в очереди с конкретным потоком, а остальные - в "глобальной" очереди. Затем, чтобы последовательно выполнять некоторые из задач, мне просто нужно вытолкнуть их в очередь, где выглядит один поток. Это звучит немного неуклюже.

Итак, настоящий вопрос в этой длинной истории: как бы вы это решили? Как бы вы гарантировали выполнение этих задач?

ИЗМЕНИТЬ

Как более общая проблема, предположим, что вышеприведенный сценарий становится

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

Я имею в виду, что задачи внутри группы должны выполняться последовательно, но сами группы могут смешиваться. Таким образом, вы можете иметь 3-2-5-4-7, например.

Еще одна вещь, которую нужно отметить, это то, что у меня нет доступа ко всем задачам в группе upfront (и я не могу дождаться, пока все они появятся до начала группы).

Спасибо за ваше время.

4b9b3361

Ответ 1

Что-то вроде следующего позволит поставить очередные и параллельные задачи в очередь, где последовательные задачи будут выполняться один за другим, а параллельные задачи будут выполняться в любом порядке, но параллельно. Это дает вам возможность сериализовать задачи там, где это необходимо, также иметь параллельные задачи, но делать это по мере получения заданий, то есть вам не нужно знать о всей последовательности вверх, порядок выполнения поддерживается динамически.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Обновление

Чтобы обрабатывать группы задач с последовательностями последовательных и параллельных задач, GroupedTaskQueue может управлять TaskQueue для каждой группы. Опять же, вам не нужно знать о группах вверх, все они динамически управляются по мере получения заданий.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

Ответ 2

Пулы потоков хороши для случаев, когда относительный порядок задач не имеет значения, при условии, что все они будут выполнены. В частности, должно быть хорошо, чтобы все они выполнялись параллельно.

Если ваши задачи должны выполняться в определенном порядке, то они не подходят для parallelism, поэтому пул потоков не подходит.

Если вы хотите перенести эти серийные задачи из основного потока, то для этих задач будет нужен один фоновый поток с очередью задач. Вы можете продолжать использовать пул потоков для остальных задач, которые подходят для parallelism.

Да, это означает, что вам нужно решить, куда отправить задание, в зависимости от того, является ли это задачей в порядке или "может быть распараллелизирована", но это не имеет большого значения.

Если у вас есть группы, которые должны быть сериализованы, но которые могут выполняться параллельно с другими задачами, тогда у вас есть несколько вариантов:

  • Создайте отдельную задачу для каждой группы, которая выполняет соответствующие групповые задачи, и отправьте эту задачу в пул потоков.
  • У каждой задачи в группе явно ждут предыдущую задачу в группе и отправляют ее в пул потоков. Это требует, чтобы ваш пул потоков мог обрабатывать случай, когда поток ожидает еще не запланированную задачу без блокировки.
  • Имейте выделенный поток для каждой группы и отправляйте групповые задачи в соответствующую очередь сообщений.

Ответ 3

Чтобы сделать то, что вы хотите сделать с помощью threadpool, вам может понадобиться создать какой-то планировщик.

Что-то вроде этого:

TaskQueue → Планировщик → Очередь → ThreadPool

Планировщик работает в своем потоке, сохраняя следы зависимостей между заданиями. Когда работа готова к выполнению, планировщик просто подталкивает ее в очередь для потока.

ThreadPool, возможно, придется отправлять сигналы Планировщику, чтобы указать, когда задание выполнено, чтобы планировщик мог помещать задания в зависимости от этого задания в очередь.

В вашем случае зависимости могут быть, вероятно, сохранены в связанном списке.

Скажем, у вас есть следующие зависимости: 3 → 4 → 6 → 8

Задание 3 работает в threadpool, у вас все еще нет идей о том, что работа 8 существует.

Задание 3 завершается. Вы удаляете 3 из связанного списка, вы помещаете задание 4 в очередь в threadpool.

Прибытие 8. Вы поместите его в конец связанного списка.

Единственными конструкциями, которые должны быть полностью синхронизированы, являются очереди до и после планировщика.

Ответ 4

В принципе, существует ряд ожидающих задач. Некоторые из задач могут выполняться только при завершении выполнения одной или нескольких других ожидающих задач.

Отложенные задачи могут быть смоделированы в графике зависимостей:

  • "task 1 → task2" означает, что задача 2 может быть выполнена только после завершения задачи 1 ". стрелки указывают направление выполнения.
  • неопределенность задачи (количество задач, указывающих на нее) определяет, готова ли задача к выполнению. Если indegree равно 0, он может быть выполнен.
  • иногда задача должна ждать завершения нескольких задач, тогда значение indegree будет > 1.
  • Если задание не нужно ждать завершения других задач (его значение равно нулю), он может быть отправлен в пул потоков с рабочими потоками или в очередь с задачами, ожидающими, когда их подхватит рабочий нить. Вы знаете, что поставленная задача не приведет к тупиковой ситуации, потому что задача ничего не ждет. В качестве оптимизации вы можете использовать очередь приоритетов, например. в котором будут выполняться задачи, из которых больше зависят задачи в зависимости от зависимости. Это также не может вызвать тупик, поскольку все задачи в пуле потоков могут выполняться. Однако это может спровоцировать голод.
  • Если задача завершает выполнение, ее можно удалить из графика зависимостей, возможно, уменьшив независимость других задач, которые, в свою очередь, могут быть отправлены в пул рабочих потоков.

Таким образом, существует (по крайней мере) один поток, используемый для добавления/удаления отложенных задач, и есть пул потоков рабочих потоков.

Когда задача добавляется в график зависимостей, вы должны проверить:

  • как задача связана с графиком зависимостей: какие задачи нужно ждать до конца и какие задачи должны ждать завершения? Нарисуйте соединения от и до новой задачи соответственно.
  • после того, как соединения будут нарисованы: появились ли новые подключения в цикле зависимостей? Если это так, возникает ситуация взаимоблокировки.

Производительность

  • этот шаблон медленнее, чем последовательное выполнение, если параллельное выполнение на самом деле редко бывает возможным, потому что вам нужно дополнительное администрирование, чтобы делать все почти последовательно в любом случае.
  • этот шаблон выполняется быстро, если многие задачи могут выполняться одновременно на практике.

Предположения

Как вы, возможно, читали между строками, вы должны проектировать задачи так, чтобы они не мешали другим задачам. Кроме того, должен быть способ определить приоритет задач. Приоритет задачи должен включать данные, обрабатываемые каждой задачей. Две задачи не могут одновременно изменять один и тот же объект; одна из задач должна получить приоритет над другим, или выполняемые операции над объектом должны быть потокобезопасными.

Ответ 5

Если я правильно понимаю проблему, у исполнителей jdk нет этой возможности, но ее легко сворачивать. Вам в основном нужно

  • пул рабочих потоков, каждый из которых имеет выделенную очередь
  • некоторая абстракция над теми очередями, к которым вы предлагаете работу (c.f. the ExecutorService)
  • некоторый алгоритм, который детерминистически выбирает конкретную очередь для каждой части работы.
  • каждая часть работы затем получает предложения в нужную очередь и, следовательно, обрабатывается в правильном порядке

Разница с исполнителями jdk заключается в том, что они имеют 1 очередь с n потоками, но вы хотите n очередей и m потоков (где n может или не может равняться m)

* редактировать после чтения, что каждая задача имеет ключ *

Немного подробнее

  • напишите код, который преобразует ключ в индекс (int) в заданном диапазоне (0-n, где n - количество требуемых потоков), это может быть так же просто, как key.hashCode() % n, или это может быть некоторая статическое отображение известных значений ключей в потоки или все, что вы хотите
  • при запуске
    • создайте n очередей, поместите их в индексированную структуру (массив, перечислите все)
    • начать n потоков, каждый поток просто выполняет блокировку из очереди
    • когда он получает какую-то работу, он знает, как выполнять работу, специфичную для этой задачи/события (очевидно, вы можете иметь некоторое сопоставление задач с действиями, если у вас есть гетерогенные события)
  • сохраните это за некоторым фасадом, который принимает рабочие элементы
  • Когда задача прибывает, передайте ее на фасад
    • фасад находит правильную очередь для задачи на основе ключа, предлагает ее этой очереди

достаточно просто добавить автоматический перезапуск рабочих потоков к этой схеме, вам просто нужно, чтобы рабочий поток зарегистрировался у какого-то менеджера, чтобы указать "Я владею этой очередью", а затем некоторое домашнее хозяйство вокруг этого + обнаружения ошибок в потоке ( что означает, что он не регистрирует право собственности на эту очередь, возвращая очередь в свободный пул очередей, который является триггером для запуска нового потока).

Ответ 6

Вариант 1 - комплексный

Поскольку у вас есть последовательные задания, вы можете собрать эти задания в цепочке и позволить самим заданиям повторно отправлять пул потоков после их завершения. Предположим, у нас есть список заданий:

 [Task1, ..., Task6]

как в вашем примере. У нас есть последовательная зависимость, такая, что [Task3, Task4, Task6] - цепочка зависимостей. Теперь мы выполняем работу (псевдокод Erlang):

 Task4Job = fun() ->
               Task4(), % Exec the Task4 job
               push_job(Task6Job)
            end.
 Task3Job = fun() ->
               Task3(), % Execute the Task3 Job
               push_job(Task4Job)
            end.
 push_job(Task3Job).

То есть мы изменяем задание Task3, перенося его на задание, которое в качестве продолжения подталкивает следующее задание в очереди в пул потоков. Существуют сильные сходства с общим стилем прохождения продолжения, который также встречается в системах типа Node.js или Pythons Twisted.

Обобщая, вы создаете систему, в которой вы можете определить цепочки заданий, которые могут defer продолжить работу и повторно отправить дальнейшую работу.

Вариант 2 - простой

Почему мы даже беспокоимся о разделении рабочих мест? Я имею в виду, поскольку они последовательно зависят, выполнение всех из них в одном и том же потоке будет не быстрее или медленнее, чем принимать эту цепочку и распространять ее на несколько потоков. Предполагая "достаточную" рабочую нагрузку, любой поток всегда будет работать в любом случае, поэтому просто связывать задания вместе, возможно, проще всего:

  Task = fun() ->
            Task3(),
            Task4(), 
            Task6()  % Just build a new job, executing them in the order desired
         end,
  push_job(Task).

Очень легко делать такие вещи, если у вас есть функции первоклассных граждан, чтобы вы могли создавать их на своем языке по своему усмотрению, например, любой язык программирования, Python, Ruby-blocks - и т.д.

Мне не очень нравится идея создания очереди или стека продолжения, как в "Варианте 1", хотя я бы определенно пошел со вторым вариантом. В Erlang у нас даже есть программы под названием jobs, написанные Erlang Solutions и выпущенные как Open Source. jobs создан для выполнения и загрузки, регулирующих выполнение заданий, подобных этим. Я бы, вероятно, объединил вариант 2 с заданиями, если бы я решил эту проблему.

Ответ 7

Ответы, предлагающие не использовать пул потоков, подобны жесткому кодированию знаний о зависимостях задач/порядке выполнения. Вместо этого я бы создал CompositeTask, который управляет зависимостью начала и конца между двумя задачами. Инкапсулируя зависимость за интерфейсом задачи, все задачи можно обрабатывать равномерно и добавлять в пул. Это скрывает детали выполнения и позволяет изменять зависимости задачи, не влияя на использование пула потоков.

В вопросе не указывается язык - я буду использовать Java, который, я надеюсь, доступен для большинства.

class CompositeTask implements Task
{
    Task firstTask;
    Task secondTask;

    public void run() {
         firstTask.run();
         secondTask.run();
    }
}

Выполняет задачи последовательно и в одном потоке. Вы можете связать множество CompositeTask вместе, чтобы создать последовательность из нескольких последовательных задач по мере необходимости.

Недостатком здесь является то, что это связывает поток на протяжении всех выполняемых последовательно задач. У вас могут быть другие задачи, которые вы бы предпочли выполнить между первой и второй задачами. Таким образом, вместо выполнения второй задачи напрямую, выполните составную задачу расписания выполнения второй задачи:

class CompositeTask implements Runnable
{
    Task firstTask;
    Task secondTask;
    ExecutorService executor;

    public void run() {
         firstTask.run();
         executor.submit(secondTask);
    }
}

Это гарантирует, что вторая задача не будет выполняться до завершения первой задачи, а также позволяет пулу выполнять другие (возможно, более срочные) задачи. Обратите внимание, что первая и вторая задачи могут выполняться в отдельных потоках, поэтому, хотя они не выполняются одновременно, любые общие данные, используемые задачами, должны быть видимыми для других потоков (например, путем создания переменных volatile.)

Это простой, но мощный и гибкий подход и позволяет самим задачам определять ограничения выполнения, а не делать это, используя разные пулы потоков.

Ответ 8

Используйте два Активные объекты. В двух словах: активный шаблон объекта состоит из очереди приоритетов и 1 или многих рабочих потоков, которые могут получать задачи из очереди и обрабатывать ее.

Итак, используйте один активный объект с одним рабочим потоком: все задачи, которые были бы местами в очереди, будут обрабатываться последовательно. Используйте второй активный объект с количеством рабочего потока более 1. В этом случае рабочие потоки будут получать и обрабатывать задачи из очереди в любом порядке.

Luck.

Ответ 9

Я думаю, что пул потоков можно эффективно использовать в этой ситуации. Идея состоит в том, чтобы использовать отдельный объект strand для каждой группы зависимых задач. Вы добавляете задачи в свою очередь с помощью или без strand объекта. Вы используете тот же объект strand с зависимыми задачами. Ваш планировщик проверяет, имеет ли следующая задача strand, и если этот strand заблокирован. Если нет - заблокируйте этот strand и запустите эту задачу. Если strand уже заблокировано, сохраните эту задачу в очереди до следующего события планирования. Когда задача выполнена, разблокируйте ее strand.

В результате вам нужна одиночная очередь, вам не нужны никакие дополнительные потоки, не сложные группы и т.д. strand объект может быть очень простым с помощью двух методов lock и unlock.

Я часто встречаю ту же проблему дизайна, например. для асинхронного сетевого сервера, который обрабатывает несколько одновременных сеансов. Сеансы независимы (это сопоставляет их с вашими независимыми задачами и группами зависимых задач), когда задачи внутри сеансов зависят (это сопоставляет внутренние задачи сеанса с вашими зависимыми задачами внутри группы). Используя описанный подход, я полностью исключаю явную синхронизацию внутри сеанса. Каждый сеанс имеет собственный объект strand.

И более того, я использую существующую (большую) реализацию этой идеи: Boost Asio library (С++). Я просто использовал термин strand. Реализация элегантна: я переношу свои задачи async в соответствующий объект strand, прежде чем планировать их.

Ответ 10

Я думаю, вы смешиваете понятия. Threadpool в порядке, если вы хотите распространять некоторую работу среди потоков, но если вы начнете смешивать зависимости между потоками, тогда это не такая хорошая идея.

Мой совет, просто не используйте threadpool для этих задач. Просто создайте выделенный поток и сохраните простую очередь последовательных элементов, которые должны обрабатываться только этим потоком. Затем вы можете продолжать задавать задачи в пуле потоков, если у вас нет последовательного требования, и используйте выделенный поток, когда у вас есть.

Пояснение: Используя здравый смысл, очередь последовательных задач должна выполняться одним потоком, обрабатывающим каждую задачу один за другим:)

Ответ 11

Как бы вы гарантировали выполнение этих задач?

push task1
push task2
push task346
push task5

В ответ на редактирование:

push task1
push task27   **
push task3468   *
push task5
push task9

Ответ 12

Это достижимо, хорошо, насколько я понимаю ваш сценарий. В основном то, что вам нужно, это сделать что-то умное, чтобы координировать свои задачи в основном потоке. Java API: ExecutorCompletionService и Callable

Сначала выполните свою вызываемую задачу:

public interface MyAsyncTask extends Callable<MyAsyncTask> {
  // tells if I am a normal or dependent task
  private boolean isDependent;

  public MyAsyncTask call() {
    // do your job here.
    return this;
  }
}

Затем в основном потоке используйте CompletionService для координации выполнения зависимой задачи (т.е. механизма ожидания):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new 
  ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
  if (task.isNormal()) {
    // if it is a normal task, submit it immediately.
    completionExecutor.submit(task);
  } else {
    if (dependentFutureTask == null) {
      // submit the first dependent task, get a reference 
      // of this dependent task for later use.
      dependentFutureTask = completionExecutor.submit(task);
    } else {
      // wait for last one completed, before submit a new one.
      dependentFutureTask.get();
      dependentFutureTask = completionExecutor.submit(task);
    }
  }
}

Таким образом, вы используете один исполнитель (размер потока 5), выполняющий как обычные, так и зависимые задачи, нормальная задача выполняется сразу же после отправки, зависимые задачи выполняются один за другим (ожидание выполняется в основном потоке путем вызова get() в Future перед отправкой новой зависимой задачи), поэтому в любой момент времени у вас всегда есть ряд нормальных задач и одна зависимая задача (если существует), запущенная в одном поточном пуле.

Это просто начало, используя ExecutorCompletionService, FutureTask и Semaphore, вы можете реализовать более сложный сценарий координации потоков.

Ответ 13

У вас есть два разных типа задач. Смешение их в одной очереди кажется довольно странным. Вместо того, чтобы одна очередь имела две. Для простоты вы даже можете использовать ThreadPoolExecutor для обоих. Для последовательных задач просто назначьте фиксированный размер 1, для задач, которые могут быть выполнены, одновременно дают больше. Я не понимаю, почему это было бы неуклюже. Держите его простым и глупым. У вас есть две разные задачи, поэтому относитесь к ним соответствующим образом.

Ответ 14

Поскольку вам нужно только дождаться завершения одной задачи перед запуском зависимой задачи, ее можно легко выполнить, если вы можете запланировать зависимую задачу в первой задаче. Итак, во втором примере: в конце задачи 2, задача планирования 7 а также в конце задачи 3, выполнить задачу 4 и т.д. для 4- > 6 и 6- > 8.

В начале просто планируйте задачи 1,2,5,9... и остальные должны следовать.

Еще более общая проблема заключается в том, что вам нужно подождать несколько задач, прежде чем может начаться зависимая задача. Эффективное обращение с ним является нетривиальным упражнением.

Ответ 15

Для этой цели существует java-инфраструктура, называемая dexecutor (отказ от ответственности: я являюсь владельцем)

DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("task1", "task2");
    executor.addDependency("task4", "task6");
    executor.addDependency("task6", "task8");

    executor.addIndependent("task3");
    executor.addIndependent("task5");
    executor.addIndependent("task7");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

task1, task3, task5, task7 выполняется параллельно (в зависимости от размера пула потоков), как только task1 заканчивается, task2 запускается, как только task2 заканчивает выполнение задач4, как только task4 завершает задачу task6, и, наконец, однажды task6 завершает выполнение задачи8.

Ответ 16

Было много ответов, и, очевидно, это было принято. Но почему бы не использовать продолжения?

Если у вас есть известное "серийное" условие, тогда, когда вы ставите в очередь первую задачу с этим условием, держите Задачу; и для дальнейших задач вызывается Task.ContinueWith().

public class PoolsTasks
{
    private readonly object syncLock = new object();
    private Task serialTask = Task.CompletedTask;


    private bool isSerialTask(Action task) {
        // However you determine what is serial ...
        return true;
    }

    public void RunMyTask(Action myTask) {
        if (isSerialTask(myTask)) {
            lock (syncLock)
                serialTask = serialTask.ContinueWith(_ => myTask());
        } else
            Task.Run(myTask);
    }
}