Насколько я понял ForkJoinPool
, этот пул создает фиксированное количество потоков (по умолчанию: количество ядер) и никогда не будет создавать больше потоков (если приложение не указывает на необходимость использования этих средств с помощью managedBlock
).
Однако, используя ForkJoinPool.getPoolSize()
, я обнаружил, что в программе, которая создает 30 000 задач (RecursiveAction
), ForkJoinPool
, выполняющий эти задачи, использует в среднем 700 потоков (потоки подсчитываются каждый раз при создании задачи). Задачи не выполняют I/O, а просто вычисляют; единственная межзадачная синхронизация вызывает ForkJoinTask.join()
и получает доступ к AtomicBoolean
s, т.е. нет операций блокировки потоков.
Так как join()
не блокирует вызывающий поток, насколько я его понимаю, нет причин, по которым какой-либо поток в пуле должен блокироваться, и поэтому (я предположил) не должно быть причин создавать какие-либо дополнительные потоки ( что, очевидно, все же происходит).
Итак, почему ForkJoinPool
создает столько потоков? Какие факторы определяют количество созданных потоков?
Я надеялся, что на этот вопрос можно ответить без размещения кода, но здесь он приходит по запросу. Этот код представляет собой отрывок из программы, в четыре раза превышающей размер, сводящийся к основным частям; он не компилируется, как есть. При желании, я могу, конечно, разместить полную программу.
Программа ищет лабиринт для пути от заданной начальной точки до заданной конечной точки с использованием поиска по глубине. Решение гарантировано. Основная логика заключается в методе compute()
SolverTask
: A RecursiveAction
, который начинается в некоторой заданной точке и продолжается со всеми соседними точками, достижимыми с текущей точки. Вместо того, чтобы создавать новую SolverTask
в каждой точке ветвления (которая создавала бы слишком много задач), она подталкивает всех соседей, кроме одного, в стек backtracking, который будет обрабатываться позже, и продолжается только с одним соседним, не помещенным в стек. Как только он достигает тупика таким образом, точка, которая была недавно переброшена в стек возврата, выскользнула, и поиск продолжается оттуда (сокращая путь, построенный из начальной точки taks соответственно). Новая задача создается после того, как задача обнаруживает, что стек обратного отслеживания больше определенного порога; с этого времени задача, продолжая выходить из стека обратной трассировки до тех пор, пока она не исчерпана, не будет выталкивать какие-либо дополнительные точки в свой стек при достижении точки ветвления, а создает новую задачу для каждой такой точки. Таким образом, размер задач может быть скорректирован с использованием порогового предела стека.
Цифры, приведенные выше ( "30 000 задач, в среднем 700 потоков" ), находятся в поисках лабиринта 5000х5000 ячеек. Итак, вот главный код:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
// Is this task still accepting new branches for processing on its own,
// or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); // Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
// Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
// Collect current unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
// contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
// Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
// Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
// Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
// Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
// Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
// Current node is a branch point, but this task does not accept new branches any more:
// Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
// Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { // subtask found solution
pathFromLocalStart.addAll(subTaskResult);
// No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} // for subTasks
} // else (not accepting any more branches)
} // if (current node is a branch)
// Current node is dead end or all its neighbors lead to dead ends:
// Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; // No more backtracking avaible: No solution exists => end of this task
}
// Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
// DEBUG System.out.println("Backtracking to " + branchingPoint);
// Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
// DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
// continue while loop with newly popped current
} // while (current ...
if (!current.equals(end)) {
// this task was interrupted by another one that already found the solution
// and should end now therefore:
return null;
} else {
// Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} // compute()
} // class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
// Start initial task
long startTime = System.currentTimeMillis();
// since SolverTask.compute() expects its starting point already visited,
// must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
// One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}