У меня есть аймический график ациклический график асинхронных задач. DAG показывает зависимости между задачами: анализируя его, можно понять, какие задачи могут выполняться параллельно (в отдельных потоках) и какие задачи должны ждать завершения других задач до их начала (зависимости).
Я хочу создать цепочку обратного вызова из DAG, используя вспомогательные функции продолжения boost::future
и .then(...)
, when_all(...)
. Результатом этого поколения будет функция, которая при вызове начнет цепочку обратного вызова и выполнит задачи, описанные в DAG, выполнив как можно больше задач параллельно.
У меня возникла проблема, однако, найти общий алгоритм, который может работать для всех случаев.
Я сделал несколько рисунков, чтобы облегчить понимание проблемы. Это легенда, которая покажет вам, что означают символы на чертежах:
Начнем с простой, линейной DAG:
Этот график зависимостей состоит из трех задач (A
, B
и C
). C
зависит от B
. B
зависит от A
. Здесь нет возможности parallelism - алгоритм генерации будет строить что-то похожее на это:
boost::future<void> A, B, C, end;
A.then([]
{
B.then([]
{
C.get();
end.get();
});
});
(Обратите внимание, что все образцы кода не действительны на 100% - я игнорирую семантику перемещения, пересылку и лямбда-захват.)
Существует множество подходов к решению этой линейной DAG: либо начиная с конца, либо в начале, тривиально построить правильную цепочку обратного вызова.
Все начинает усложняться при вводе вилок и соединений.
Здесь DAG с fork/join:
Трудно думать о цепочке обратного вызова, которая соответствует этой DAG. Если я попытаюсь работать назад, начиная с конца, мои рассуждения таковы:
-
end
зависит отB
иD
. (присоединиться)-
D
зависит отC
. -
B
иC
зависят отA
. (Вилка)
-
Возможная цепочка выглядит примерно так:
boost::future<void> A, B, C, D, end;
A.then([]
{
boost::when_all(B, C.then([]
{
D.get();
}))
.then([]
{
end.get();
});
});
Мне было сложно писать эту цепочку вручную, и я также сомневаюсь в ее правильности. Я не мог придумать общий способ реализации алгоритма, который мог бы сгенерировать это - дополнительные трудности также присутствуют в связи с тем, что when_all
нуждается в переносе его аргументов.
Посмотрим на последний, еще более сложный пример:
Здесь мы хотим максимально использовать parallelism. Рассмотрим задачу E
: E
можно запустить параллельно с любым из [B, C, D]
.
Это возможная цепочка обратного вызова:
boost::future<void> A, B, C, D, E, F, end;
A.then([]
{
boost::when_all(boost::when_all(B, C).then([]
{
D.get();
}),
E)
.then([]
{
F.then([]
{
end.get();
});
});
});
Я попытался придумать общий алгоритм несколькими способами:
-
Начиная с начала DAG, пытаясь создать цепочку с продолжением
.then(...)
. Это не работает с объединениями, поскольку задача целевого соединения повторяется несколько раз. -
Начиная с конца DAG, пытаясь сгенерировать цепочку с помощью продолжений
when_all(...)
. Это не работает с вилками, поскольку node, который создает вилку, повторяется несколько раз.
Очевидно, что подход с "широким первым обходом" здесь не работает. Из образцов кода, которые я написал вручную, кажется, что алгоритм должен знать о вилках и объединениях и должен иметь возможность правильно смешивать продолжения .then(...)
и when_all(...)
.
Вот мои последние вопросы:
-
Можно ли всегда генерировать цепочку обратного вызова
future
на основе DAG зависимостей задач, где каждая задача появляется только один раз в цепочке обратного вызова? -
Если да, то каким образом может быть реализован общий алгоритм, который при задании зависимостей задачи DAG создает цепочку обратного вызова?
ИЗМЕНИТЬ 1:
Здесь дополнительный подход Я пытаюсь изучить.
Идея состоит в том, чтобы создать структуру данных карты ([dependencies...] -> [dependents...])
из DAG и создать цепочку обратного вызова с этой карты.
Если len(dependencies...) > 1
, то value
является соединением node.
Если len(dependents...) > 1
, то key
является fork node.
Все пары ключ-значение на карте могут быть выражены как when_all(keys...).then(values...)
продолжения.
Трудная часть - это правильный порядок, в котором нужно "развернуть" (подумать о чем-то подобном парсеру), узлах и о том, как объединить продолжения fork/join вместе.
Рассмотрим следующее отображение, созданное изображением 4.
depenendencies | dependents
----------------|-------------
[F] : [end]
[D, E] : [F]
[B, C] : [D]
[A] : [E, C, B]
[begin] : [A]
Используя некоторые виды парсеровских сокращений/пропусков, мы можем получить "чистую" цепочку обратного вызова:
// First pass:
// Convert everything to `when_all(...).then(...)` notation
when_all(F).then(end)
when_all(D, E).then(F)
when_all(B, C).then(D)
when_all(A).then(E, C, B)
when_all(begin).then(A)
// Second pass:
// Solve linear (trivial) transformations
when_all(D, E).then(
when_all(F).then(end)
)
when_all(B, C).then(D)
when_all(
when_all(begin).then(A)
).then(E, C, B)
// Third pass:
// Solve fork/join transformations
when_all(
when_all(begin).then(A)
).then(
when_all(
E,
when_all(B, C).then(D)
).then(
when_all(F).then(end)
)
)
Третий проход является самым важным, но также и тот, который выглядит очень сложно для разработки алгоритма для.
Обратите внимание, что [B, C]
должен быть найден внутри списка [E, C, B]
, и как в [D, E]
списке зависимостей D
следует интерпретировать как результат when_all(B, C).then(D)
и связать вместе с E
в when_all(E, when_all(B, C).then(D))
.
Возможно, вся проблема может быть упрощена, например:
Учитывая карту, состоящую из пар значений ключа [dependencies...] -> [dependents...]
, как можно реализовать алгоритм, который преобразует эти пары в цепочку продолжения when_all(...)
/.then(...)
?
ИЗМЕНИТЬ 2:
Здесь некоторый псевдокод. Придумал описанный выше подход. Кажется, что я работаю для DAG, которую я пробовал, но мне нужно потратить больше времени на это и "мысленно" проверить его на другие, более сложные конфигурации DAG.