Подтвердить что ты не робот

Создание асинхронной цепочки обратного вызова `future` из графика зависимостей времени компиляции (DAG)

У меня есть аймический график ациклический график асинхронных задач. DAG показывает зависимости между задачами: анализируя его, можно понять, какие задачи могут выполняться параллельно (в отдельных потоках) и какие задачи должны ждать завершения других задач до их начала (зависимости).

Я хочу создать цепочку обратного вызова из DAG, используя вспомогательные функции продолжения boost::future и .then(...), when_all(...). Результатом этого поколения будет функция, которая при вызове начнет цепочку обратного вызова и выполнит задачи, описанные в DAG, выполнив как можно больше задач параллельно.

У меня возникла проблема, однако, найти общий алгоритм, который может работать для всех случаев.

Я сделал несколько рисунков, чтобы облегчить понимание проблемы. Это легенда, которая покажет вам, что означают символы на чертежах:

Легенда: как читать изображения.

Начнем с простой, линейной DAG:

Пример 0: линейная DAG.

Этот график зависимостей состоит из трех задач (A, B и C). C зависит от B. B зависит от A. Здесь нет возможности parallelism - алгоритм генерации будет строить что-то похожее на это:

boost::future<void> A, B, C, end;

A.then([]
    {
        B.then([]
            {
                C.get();
                end.get();
            });
    });

(Обратите внимание, что все образцы кода не действительны на 100% - я игнорирую семантику перемещения, пересылку и лямбда-захват.)

Существует множество подходов к решению этой линейной DAG: либо начиная с конца, либо в начале, тривиально построить правильную цепочку обратного вызова.

Все начинает усложняться при вводе вилок и соединений.

Здесь DAG с fork/join:

Пример 1: DAG с fork/join.

Трудно думать о цепочке обратного вызова, которая соответствует этой DAG. Если я попытаюсь работать назад, начиная с конца, мои рассуждения таковы:

  • end зависит от B и D. (присоединиться)
    • D зависит от C.
    • B и C зависят от A. (Вилка)

Возможная цепочка выглядит примерно так:

boost::future<void> A, B, C, D, end;

A.then([]
    {
        boost::when_all(B, C.then([]
                               {
                                   D.get();
                               }))
            .then([]
                {
                    end.get();
                });
    });

Мне было сложно писать эту цепочку вручную, и я также сомневаюсь в ее правильности. Я не мог придумать общий способ реализации алгоритма, который мог бы сгенерировать это - дополнительные трудности также присутствуют в связи с тем, что when_all нуждается в переносе его аргументов.

Посмотрим на последний, еще более сложный пример:

Пример 2: сложный DAG.

Здесь мы хотим максимально использовать parallelism. Рассмотрим задачу E: E можно запустить параллельно с любым из [B, C, D].

Это возможная цепочка обратного вызова:

boost::future<void> A, B, C, D, E, F, end;

A.then([]
    {
        boost::when_all(boost::when_all(B, C).then([]
                            {
                                D.get();
                            }),
            E)
            .then([]
                {
                    F.then([]
                        {
                            end.get();
                        });
                });
    });

Я попытался придумать общий алгоритм несколькими способами:

  • Начиная с начала DAG, пытаясь создать цепочку с продолжением .then(...). Это не работает с объединениями, поскольку задача целевого соединения повторяется несколько раз.

  • Начиная с конца DAG, пытаясь сгенерировать цепочку с помощью продолжений when_all(...). Это не работает с вилками, поскольку node, который создает вилку, повторяется несколько раз.

Очевидно, что подход с "широким первым обходом" здесь не работает. Из образцов кода, которые я написал вручную, кажется, что алгоритм должен знать о вилках и объединениях и должен иметь возможность правильно смешивать продолжения .then(...) и when_all(...).

Вот мои последние вопросы:

  • Можно ли всегда генерировать цепочку обратного вызова future на основе DAG зависимостей задач, где каждая задача появляется только один раз в цепочке обратного вызова?

  • Если да, то каким образом может быть реализован общий алгоритм, который при задании зависимостей задачи DAG создает цепочку обратного вызова?


ИЗМЕНИТЬ 1:

Здесь дополнительный подход Я пытаюсь изучить.

Идея состоит в том, чтобы создать структуру данных карты ([dependencies...] -> [dependents...]) из DAG и создать цепочку обратного вызова с этой карты.

Если len(dependencies...) > 1, то value является соединением node.

Если len(dependents...) > 1, то key является fork node.

Все пары ключ-значение на карте могут быть выражены как when_all(keys...).then(values...) продолжения.

Трудная часть - это правильный порядок, в котором нужно "развернуть" (подумать о чем-то подобном парсеру), узлах и о том, как объединить продолжения fork/join вместе.

Рассмотрим следующее отображение, созданное изображением 4.

depenendencies  |  dependents
----------------|-------------
[F]             :  [end]
[D, E]          :  [F]
[B, C]          :  [D]
[A]             :  [E, C, B]
[begin]         :  [A]

Используя некоторые виды парсеровских сокращений/пропусков, мы можем получить "чистую" цепочку обратного вызова:

// First pass:
// Convert everything to `when_all(...).then(...)` notation
when_all(F).then(end)
when_all(D, E).then(F)
when_all(B, C).then(D)
when_all(A).then(E, C, B)
when_all(begin).then(A)

// Second pass:
// Solve linear (trivial) transformations
when_all(D, E).then(
    when_all(F).then(end)
)
when_all(B, C).then(D)
when_all(
    when_all(begin).then(A)
).then(E, C, B)

// Third pass:
// Solve fork/join transformations
when_all(
    when_all(begin).then(A)
).then(
    when_all(
        E, 
        when_all(B, C).then(D)
    ).then(
        when_all(F).then(end)
    )   
)

Третий проход является самым важным, но также и тот, который выглядит очень сложно для разработки алгоритма для.

Обратите внимание, что [B, C] должен быть найден внутри списка [E, C, B], и как в [D, E] списке зависимостей D следует интерпретировать как результат when_all(B, C).then(D) и связать вместе с E в when_all(E, when_all(B, C).then(D)).

Возможно, вся проблема может быть упрощена, например:

Учитывая карту, состоящую из пар значений ключа [dependencies...] -> [dependents...], как можно реализовать алгоритм, который преобразует эти пары в цепочку продолжения when_all(...)/.then(...)?

ИЗМЕНИТЬ 2:

Здесь некоторый псевдокод. Придумал описанный выше подход. Кажется, что я работаю для DAG, которую я пробовал, но мне нужно потратить больше времени на это и "мысленно" проверить его на другие, более сложные конфигурации DAG.

4b9b3361

Ответ 1

Если могут возникнуть избыточные зависимости, сначала удалите их (см., например, https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph).

Затем выполните следующие преобразования графиков (построение подвыражений в объединенных узлах) до тех пор, пока вы не опуститесь до единственного node (аналогично тому, как вы вычислили сеть резисторов):

Graph transformations

*: дополнительные входящие или исходящие зависимости, в зависимости от места размещения

(...): выражение в одном node

Java-код, включающий настройку для более сложного примера:

public class DirectedGraph {
  /** Set of all nodes in the graph */
  static Set<Node> allNodes = new LinkedHashSet<>();

  static class Node {
    /** Set of all preceeding nodes */
    Set<Node> prev = new LinkedHashSet<>();

    /** Set of all following nodes */
    Set<Node> next = new LinkedHashSet<>();

    String value;

    Node(String value) {
      this.value = value;
      allNodes.add(this);
    }

    void addPrev(Node other) {
      prev.add(other);
      other.next.add(this);
    }

    /** Returns one of the next nodes */
    Node anyNext() {
      return next.iterator().next();
    }

    /** Merges this node with other, then removes other */
    void merge(Node other) {
      prev.addAll(other.prev);
      next.addAll(other.next);
      for (Node on: other.next) {
        on.prev.remove(other);
        on.prev.add(this);
      }
      for (Node op: other.prev) {
        op.next.remove(other);
        op.next.add(this);
      }
      prev.remove(this);
      next.remove(this);
      allNodes.remove(other);
    }

    public String toString() {
      return value;
    }
  }

  /** 
   * Merges sequential or parallel nodes following the given node.
   * Returns true if any node was merged.
   */
  public static boolean processNode(Node node) {
    // Check if we are the start of a sequence. Merge if so.
    if (node.next.size() == 1 && node.anyNext().prev.size() == 1) {
      Node then = node.anyNext();
      node.value += " then " + then.value;
      node.merge(then);
      return true;
    }

    // See if any of the next nodes has a parallel node with
    // the same one level indirect target. 
    for (Node next : node.next) {

      // Nodes must have only one in and out connection to be merged.
      if (next.prev.size() == 1 && next.next.size() == 1) {

        // Collect all parallel nodes with only one in and out connection 
        // and the same target; the same source is implied by iterating over 
        // node.next again.
        Node target = next.anyNext().next();
        Set<Node> parallel = new LinkedHashSet<Node>();
        for (Node other: node.next) {
          if (other != next && other.prev.size() == 1
             && other.next.size() == 1 && other.anyNext() == target) {
            parallel.add(other);
          }
        }

        // If we have found any "parallel" nodes, merge them
        if (parallel.size() > 0) {
          StringBuilder sb = new StringBuilder("allNodes(");
          sb.append(next.value);
          for (Node other: parallel) {
            sb.append(", ").append(other.value);
            next.merge(other);
          }
          sb.append(")");
          next.value = sb.toString();
          return true;
        }
      }
    }
    return false;
  }

  public static void main(String[] args) {
    Node a = new Node("A");
    Node b = new Node("B");
    Node c = new Node("C");
    Node d = new Node("D");
    Node e = new Node("E");
    Node f = new Node("F");

    f.addPrev(d);
    f.addPrev(e);

    e.addPrev(a);

    d.addPrev(b);
    d.addPrev(c);

    b.addPrev(a);
    c.addPrev(a);

    boolean anyChange;
    do {
      anyChange = false;
      for (Node node: allNodes) {
        if (processNode(node)) {
          anyChange = true;
          // We need to leave the inner loop here because changes
          // invalidate the for iteration. 
          break;
        }
      }
      // We are done if we can't find any node to merge.
    } while (anyChange);

    System.out.println(allNodes.toString());
  }
}

Выход: A then all(E, all(B, C) then D) then F

Ответ 2

Самый простой способ - начать с записи node графика, как будто вы пишете код вручную. Чтобы решить проблему join, вы не можете использовать рекурсивное решение, вам нужно иметь топологическое упорядочение вашего графика, а затем построить график согласно упорядочение.

Это дает гарантию, что при создании node все его предшественники уже созданы.

Для достижения этой цели мы можем использовать DFS с обратным порядком размещения.

Как только у вас есть топологическая сортировка, вы можете забыть исходные идентификаторы node и обратиться к узлам с их номером в списке. Для этого вам нужно создать временную карту компиляции, которая позволяет получить предшественники node с использованием индекса node в топологической сортировке вместо исходного индекса node node.


РЕДАКТИРОВАТЬ. Следуя указаниям по реализации топологической сортировки во время компиляции, я реорганизовал этот ответ.

Чтобы быть на той же странице, я буду считать, что ваш график выглядит следующим образом:

struct mygraph
{
     template<int Id>
     static constexpr auto successors(node_id<Id>) ->
        list< node_id<> ... >; //List of successors for the input node

     template<int Id>
     static constexpr auto predecessors(node_id<Id>) ->
        list< node_id<> ... >; //List of predecessors for the input node

     //Get the task associated with the given node.
     template<int Id>
     static constexpr auto task(node_id<Id>);

     using entry_node = node_id<0>;
};

Шаг 1: топологическая сортировка

Основной ингредиент, который вам нужен, это набор времени компиляции node -ids. В TMP набор также является списком, просто потому, что в set<Ids...> имеет значение порядок Ids. Это означает, что вы можете использовать одну и ту же структуру данных для кодирования информации о том, был ли посещенный node и результирующий заказ в то же время.

/** Topological sort using DFS with reverse-postordering **/
template<class Graph>
struct topological_sort
{
private:
    struct visit;

    // If we reach a node that we already visited, do nothing.
    template<int Id, int ... Is>
    static constexpr auto visit_impl( node_id<Id>,
                                      set<Is...> visited,
                                      std::true_type )
    {
        return visited;
    }

    // This overload kicks in when node has not been visited yet.
    template<int Id, int ... Is>
    static constexpr auto visit_impl( node_id<Id> node,
                                      set<Is...> visited,
                                      std::false_type )
    {
        // Get the list of successors for the current node
        constexpr auto succ = Graph::successors(node);

        // Reverse postordering: we call insert *after* visiting the successors
        // This will call "visit" on each successor, updating the
        // visited set after each step.
        // Then we insert the current node in the set.
        // Notice that if the graph is cyclic we end up in an infinite
        // recursion here.
        return fold( succ,
                     visited,
                     visit() ).insert(node);

        // Conventional DFS would be:
        // return fold( succ, visited.insert(node), visit() );
    }

    struct visit
    {
        // Dispatch to visit_impl depending on the result of visited.contains(node)
        // Note that "contains" returns a type convertible to
        // integral_constant<bool,x>
        template<int Id, int ... Is>
        constexpr auto operator()( set<Is...> visited, node_id<Id> node ) const
        {
            return visit_impl(node, visited, visited.contains(node) );
        }
    };

public:
    template<int StartNodeId>
    static constexpr auto compute( node_id<StartNodeId> node )
    {
        // Start visiting from the entry node
        // The set of visited nodes is initially empty.
        // "as_list" converts set<Is ... > to list< node_id<Is> ... >.
        return reverse( visit()( set<>{}, node ).as_list() );
    }
};

Этот алгоритм с графиком из вашего последнего примера (при условии A = node_id<0>, B = node_id<1> и т.д.) создает list<A,B,C,D,E,F>.

Шаг 2: отображение графика

Это просто адаптер, который изменяет идентификатор каждого node на вашем графике в соответствии с данным заказом. Предположим, что предыдущие шаги вернули list<C,D,A,B>, этот graph_map отобразил бы индекс 0 на C, индекс 1 на D и т.д.

template<class Graph, class List>
class graph_map
{   
    // Convert a node_id from underlying graph.
    // Use a function-object so that it can be passed to algorithms.
    struct from_underlying
    { 
        template<int I>
        constexpr auto operator()(node_id<I> id) 
        { return node_id< find(id, List{}) >{}; }
    };

    struct to_underlying
    { 
        template<int I>
        constexpr auto operator()(node_id<I> id) 
        { return get<I>(List{}); }
    };

public:        
    template<int Id>
    static constexpr auto successors( node_id<Id> id )
    {
        constexpr auto orig_id = to_underlying()(id);
        constexpr auto orig_succ = Graph::successors( orig_id );
        return transform( orig_succ, from_underlying() );
    }

    template<int Id>
    static constexpr auto predecessors( node_id<Id> id )
    {
        constexpr auto orig_id = to_underlying()(id);
        constexpr auto orig_succ = Graph::predecessors( orig_id );
        return transform( orig_succ, from_underlying() );
    }

    template<int Id>
    static constexpr auto task( node_id<Id> id )
    {
        return Graph::task( to_underlying()(id) );
    }

    using entry_node = decltype( from_underlying()( typename Graph::entry_node{} ) );
};

Шаг 3: соберите результат

Теперь мы можем перебирать каждый node id по порядку. Благодаря тому, как мы построили график, мы знаем, что все предшественники I имеют node id, который меньше I, для всех возможных node I.

// Returns a tuple<> of futures
template<class GraphMap, class ... Ts>
auto make_cont( std::tuple< future<Ts> ... > && pred )
{
     // The next node to work with is N:
     constexpr auto current_node = node_id< sizeof ... (Ts) >();

     // Get a list of all the predecessors for the current node.
     auto indices = GraphMap::predecessors( current_node );

     // "select" is some magic function that takes a tuple of Ts
     // and an index_sequence, and returns a tuple of references to the elements 
     // from the input tuple that are in the indices list. 
     auto futures = select( pred, indices );

     // Assuming you have an overload of when_all that takes a tuple,
     // otherwise use C++17 apply.
     auto join = when_all( futures );

     // Note: when_all with an empty parameter list returns a future< tuple<> >,
     // which is always ready.
     // In general this has to be a shared_future, but you can avoid that
     // by checking if this node has only one successor.
     auto next = join.then( GraphMap::task( current_node ) ).share();

     // Return a new tuple of futures, pushing the new future at the back.
     return std::tuple_cat( std::move(pred),
                            std::make_tuple(std::move(next)) );         
}


// Returns a tuple of futures, you can take the last element if you
// know that your DAG has only one leaf, or do some additional 
// processing to extract only the leaf nodes.
template<class Graph>
auto make_callback_chain()
{
    constexpr auto entry_node = typename Graph::entry_node{};

    constexpr auto sorted_list = 
         topological_sort<Graph>::compute( entry_node );

    using map = graph_map< Graph, decltype(sorted_list) >;

    // Note: we are not really using the "index" in the functor here, 
    // we only want to call make_cont once for each node in the graph
    return fold( sorted_list, 
                 std::make_tuple(), //Start with an empty tuple
                 []( auto && tuple, auto index )
                 {
                     return make_cont<map>(std::move(tuple));
                 } );
}

Полная демо-версия

Ответ 3

Это кажется достаточно легким, если вы перестанете думать об этом в форме явных зависимостей и организации DAG. Каждая задача может быть организована следующим образом (С#, потому что это намного проще объяснить идею):

class MyTask
{
    // a list of all tasks that depend on this to be finished
    private readonly ICollection<MyTask> _dependenants;
    // number of not finished dependencies of this task
    private int _nrDependencies;

    public int NrDependencies
    {
        get { return _nrDependencies; }
        private set { _nrDependencies = value; }
    }
}

Если у вас организована ваша DAG в такой форме, проблема на самом деле очень проста: каждая задача, где _nrDependencies == 0 может быть выполнена. Поэтому нам нужен метод run, который выглядит примерно так:

public async Task RunTask()
{
    // Execute actual code of the task.
    var tasks = new List<Task>();
    foreach (var dependent in _dependenants)
    {
        if (Interlocked.Decrement(ref dependent._nrDependencies) == 0)
        {
            tasks.Add(Task.Run(() => dependent.RunTask()));
        }
    }
    await Task.WhenAll(tasks);
}

В принципе, как только наша задача завершится, мы пройдем через всех наших иждивенцев и выполним все те, у которых больше нет незавершенных зависимостей.

Чтобы начать все это, вам нужно только позвонить RunTask() для всех задач, для которых есть нулевые иждивенцы (хотя бы один из них должен существовать, так как у нас есть DAG). Как только все эти задачи завершатся, мы знаем, что весь DAG выполнен.

Ответ 4

Этот график не построен во время компиляции, но мне это не ясно, если это требование. График удерживается в графе ускорения, реализованном как adjacency_list<vecS, vecS, bidirectionalS>. Задачи будут запускаться только одной отправкой. Нам просто нужны краны на каждом node, так что мы знаем только то, что ожидаем. Это предварительно рассчитано при создании экземпляра в планировщике ниже.

Я утверждаю, что полная топологическая сортировка не нужна.

Например, если граф зависимостей:

введите описание изображения здесь

использовать scheduler_driver.cpp

Для соединения, как в

введите описание изображения здесь

просто переопределите Graph, чтобы определить направленные ребра.

Итак, чтобы ответить на ваши 2 вопроса:

. Да, для DAG. Для каждого node необходимы только уникальные непосредственные зависимости, которые можно предварительно вычислить, как показано ниже. Затем цепочку зависимостей можно инициировать с помощью одной отправки, а цепочка домино падает.

. Да, см. Алгоритм ниже (с использованием потоков С++ 11, а не boost::thread). Для forks для связи требуется a shared_future, а соединения поддерживаются с помощью связи future.

scheduler_driver.hpp:

#ifndef __SCHEDULER_DRIVER_HPP__
#define __SCHEDULER_DRIVER_HPP__

#include <iostream>
#include <ostream>
#include <iterator>
#include <vector>
#include <chrono>

#include "scheduler.h"

#endif

scheduler_driver.cpp:

#include "scheduler_driver.hpp"

enum task_nodes
  {
    task_0,
    task_1,
    task_2,
    task_3,
    task_4,
    task_5,
    task_6,
    task_7,
    task_8,
    task_9,
    N
  };

int basic_task(int a, int d)
{
  std::chrono::milliseconds sleepDuration(d);
  std::this_thread::sleep_for(sleepDuration);
  std::cout << "Result: " << a << "\n";
  return a;
}

using namespace SCHEDULER;

int main(int argc, char **argv)
{

  using F = std::function<R()>;

  Graph deps(N);
  boost::add_edge(task_0, task_1, deps);
  boost::add_edge(task_0, task_2, deps);
  boost::add_edge(task_0, task_3, deps);
  boost::add_edge(task_1, task_4, deps);
  boost::add_edge(task_1, task_5, deps);
  boost::add_edge(task_1, task_6, deps);
  boost::add_edge(task_2, task_7, deps);
  boost::add_edge(task_2, task_8, deps);
  boost::add_edge(task_2, task_9, deps);

  std::vector<F> tasks = 
    {
      std::bind(basic_task, 0, 1000),
      std::bind(basic_task, 1, 1000),
      std::bind(basic_task, 2, 1000),
      std::bind(basic_task, 3, 1000),
      std::bind(basic_task, 4, 1000),
      std::bind(basic_task, 5, 1000),
      std::bind(basic_task, 6, 1000),
      std::bind(basic_task, 7, 1000),
      std::bind(basic_task, 8, 1000),
      std::bind(basic_task, 9, 1000)
    };

  auto s = std::make_unique<scheduler<int>>(std::move(deps), std::move(tasks));
  s->doit();

  return 0;
}

scheduler.h:

#ifndef __SCHEDULER2_H__
#define __SCHEDULER2_H__

#include <iostream>
#include <vector>
#include <iterator>
#include <functional>
#include <algorithm>
#include <mutex>
#include <thread>
#include <future>
#include <boost/graph/graph_traits.hpp>
#include <boost/graph/adjacency_list.hpp>
#include <boost/graph/depth_first_search.hpp>
#include <boost/graph/visitors.hpp>

using namespace boost;

namespace SCHEDULER
{

  using Graph = adjacency_list<vecS, vecS, bidirectionalS>;
  using Edge = graph_traits<Graph>::edge_descriptor;
  using Vertex = graph_traits<Graph>::vertex_descriptor;
  using VectexCont = std::vector<Vertex>;
  using outIt = graph_traits<Graph>::out_edge_iterator;
  using inIt = graph_traits<Graph>::in_edge_iterator;

  template<typename R>
    class scheduler
    {
    public:
      using ret_type = R;
      using fun_type = std::function<R()>;
      using prom_type = std::promise<ret_type>;
      using fut_type = std::shared_future<ret_type>;

      scheduler() = default;
      scheduler(const Graph &deps_, const std::vector<fun_type> &tasks_) :
        g(deps_),
        tasks(tasks_) { init_();}
        scheduler(Graph&& deps_, std::vector<fun_type>&& tasks_) :
          g(std::move(deps_)),
          tasks(std::move(tasks_)) { init_(); }
        scheduler(const scheduler&) = delete;
        scheduler& operator=(const scheduler&) = delete;

        void doit();

    private:
        void init_();
        std::list<Vertex> get_sources(const Vertex& v);
        auto task_thread(fun_type&& f, int i);

        Graph g;
        std::vector<fun_type> tasks;
        std::vector<prom_type> prom;
        std::vector<fut_type> fut;
        std::vector<std::thread> th;
        std::vector<std::list<Vertex>> sources;

    };

  template<typename R>
    void
    scheduler<R>::init_()
    {
      int num_tasks = tasks.size();

      prom.resize(num_tasks);
      fut.resize(num_tasks);

      // Get the futures
      for(size_t i=0;
          i<num_tasks;
          ++i)
        {
          fut[i] = prom[i].get_future();
        }

      // Predetermine in_edges for faster traversal
      sources.resize(num_tasks);
      for(size_t i=0;
          i<num_tasks;
          ++i)
        {
          sources[i] = get_sources(i);
        }
    }

  template<typename R>
    std::list<Vertex>
    scheduler<R>::get_sources(const Vertex& v)
    {
      std::list<Vertex> r;
      Vertex v1;
      inIt j, j_end;
      boost::tie(j,j_end) = in_edges(v, g);
      for(;j != j_end;++j)
        {
          v1 = source(*j, g);
          r.push_back(v1);
        }
      return r;
    }

  template<typename R>
    auto
    scheduler<R>::task_thread(fun_type&& f, int i)
    {
      auto j_beg = sources[i].begin(), 
        j_end = sources[i].end();
      for(;
          j_beg != j_end;
          ++j_beg)
        {
          R val = fut[*j_beg].get();
        }

      return std::thread([this](fun_type f, int i)
                         {
                           prom[i].set_value(f());
                         },f,i);
    }

  template<typename R>
    void
    scheduler<R>::doit()
    {
      size_t num_tasks = tasks.size();
      th.resize(num_tasks);

      for(int i=0;
          i<num_tasks;
          ++i)
        {
          th[i] = task_thread(std::move(tasks[i]), i);
        }
      for_each(th.begin(), th.end(), mem_fn(&std::thread::join));
    }

} // namespace SCHEDULER

#endif

Ответ 5

Я не уверен, что такое ваша установка и почему вам нужно создать DAG, но я думаю, что достаточно простого жадного алгоритма.

when (some task have finished) {
     mark output resources done;
     find all tasks that can be run;
     post them to thread pool;
}

Ответ 6

Рассмотрим использование библиотеки Intel TBB Flow Graph.