Подтвердить что ты не робот

Что определяет количество потоков, создаваемых Java ForkJoinPool?

Насколько я понял ForkJoinPool, этот пул создает фиксированное количество потоков (по умолчанию: количество ядер) и никогда не будет создавать больше потоков (если приложение не указывает на необходимость использования этих средств с помощью managedBlock).

Однако, используя ForkJoinPool.getPoolSize(), я обнаружил, что в программе, которая создает 30 000 задач (RecursiveAction), ForkJoinPool, выполняющий эти задачи, использует в среднем 700 потоков (потоки подсчитываются каждый раз при создании задачи). Задачи не выполняют I/O, а просто вычисляют; единственная межзадачная синхронизация вызывает ForkJoinTask.join() и получает доступ к AtomicBoolean s, т.е. нет операций блокировки потоков.

Так как join() не блокирует вызывающий поток, насколько я его понимаю, нет причин, по которым какой-либо поток в пуле должен блокироваться, и поэтому (я предположил) не должно быть причин создавать какие-либо дополнительные потоки ( что, очевидно, все же происходит).

Итак, почему ForkJoinPool создает столько потоков? Какие факторы определяют количество созданных потоков?

Я надеялся, что на этот вопрос можно ответить без размещения кода, но здесь он приходит по запросу. Этот код представляет собой отрывок из программы, в четыре раза превышающей размер, сводящийся к основным частям; он не компилируется, как есть. При желании, я могу, конечно, разместить полную программу.

Программа ищет лабиринт для пути от заданной начальной точки до заданной конечной точки с использованием поиска по глубине. Решение гарантировано. Основная логика заключается в методе compute() SolverTask: A RecursiveAction, который начинается в некоторой заданной точке и продолжается со всеми соседними точками, достижимыми с текущей точки. Вместо того, чтобы создавать новую SolverTask в каждой точке ветвления (которая создавала бы слишком много задач), она подталкивает всех соседей, кроме одного, в стек backtracking, который будет обрабатываться позже, и продолжается только с одним соседним, не помещенным в стек. Как только он достигает тупика таким образом, точка, которая была недавно переброшена в стек возврата, выскользнула, и поиск продолжается оттуда (сокращая путь, построенный из начальной точки taks соответственно). Новая задача создается после того, как задача обнаруживает, что стек обратного отслеживания больше определенного порога; с этого времени задача, продолжая выходить из стека обратной трассировки до тех пор, пока она не исчерпана, не будет выталкивать какие-либо дополнительные точки в свой стек при достижении точки ветвления, а создает новую задачу для каждой такой точки. Таким образом, размер задач может быть скорректирован с использованием порогового предела стека.

Цифры, приведенные выше ( "30 000 задач, в среднем 700 потоков" ), находятся в поисках лабиринта 5000х5000 ячеек. Итак, вот главный код:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
4b9b3361

Ответ 1

Есть вопросы, связанные с stackoverflow:

Закладки ForkJoinPool во время invokeAll/join

ForkJoinPool, кажется, теряет поток

Я сделал runnable урезанную версию того, что происходит (аргументы jvm, которые я использовал: -Xms256m -Xmx1024m -Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

По-видимому, когда вы выполняете соединение, текущий поток видит, что требуемая задача еще не завершена, и для этого требуется другая задача.

Это происходит в java.util.concurrent.ForkJoinWorkerThread#joinTask.

Однако эта новая задача порождает больше одних и тех же задач, но они не могут найти потоки в пуле, потому что потоки заблокированы в соединении. И поскольку у него нет способа узнать, сколько времени потребуется для их освобождения (нить может быть бесконечной петлей или заперта навсегда), новая нить (и) порождена (компенсируется для присоединенных потоков, как упоминал Луис Вассерман ): java.util.concurrent.ForkJoinPool#signalWork

Поэтому для предотвращения такого сценария вам необходимо избегать рекурсивного нереста задач.

Например, если в приведенном выше коде вы устанавливаете начальный параметр равным 1, активная величина потока будет равна 2, даже если вы увеличите число детей в десятикратном размере.

Также обратите внимание, что при увеличении количества активных потоков количество работающих потоков меньше или равно parallelism.

Ответ 2

Из исходных комментариев:

Компенсация: если уже существует достаточно живых потоков, метод tryPreBlock() может создать или повторно активировать запасной поток, чтобы компенсировать блокированные столяры, пока они не разблокируются.

Я думаю, что случается так, что вы не очень быстро заканчиваете какие-либо задачи, и поскольку при отправке новой задачи нет рабочих потоков, создается новый поток.

Ответ 3

строгий, полный-строгий и терминально-строгий подход к обработке ориентированного ациклического графа (DAG). Вы можете использовать эти термины, чтобы получить полное представление о них. Это тип обработки, который был разработан для обработки. Посмотрите на код в API для рекурсивных..., структура использует ваш код compute() для других ссылок compute(), а затем выполните команду join(). Каждая задача выполняет одно соединение() так же, как обработка DAG.

Вы не выполняете обработку DAG. Вы открываете много новых задач и ожидаете (join()) для каждого. Прочитайте исходный код. Это ужасно сложно, но вы можете понять это. В рамках этой структуры не выполняется надлежащее управление задачами. Где он собирается поставить ожидающую задачу, когда она выполняет соединение()? Не существует приостановленной очереди, для которой требуется, чтобы поток монитора постоянно смотрел на очередь, чтобы увидеть, что закончилось. Вот почему структура использует "продолжения потоков". Когда одна задача соединяется(), платформа предполагает, что она ожидает завершения одной нижней задачи. Когда много методов join() присутствуют, поток не может продолжаться, поэтому существует вспомогательный поток или продолжение.

Как отмечалось выше, вам нужен процесс fork-join типа sort-gather. Там вы можете развить столько задач

Ответ 4

Оба фрагмента кода, отправленные Holger Peine и elusive-code, фактически не следуют рекомендуемой практике который появился в javadoc для версии 1.8:

В наиболее типичных случаях использование пары fork-join действует как вызов (fork) и return (join) от параллельной рекурсивной функции. Как есть случай с другими формами рекурсивных вызовов, возвращает (объединяет) следует выполнять в первую очередь. Например, a.fork(); b.fork(); b.join(); a.join(); вероятно, будет значительно больше эффективнее, чем объединение кода a перед кодом b.

В обоих случаях FJPool был создан с использованием конструктора по умолчанию. Это приводит к созданию пула с asyncMode = false, который по умолчанию:

@param asyncMode, если true,
    устанавливает локальный режим планирования в первом-в-первом режиме для раздвоенного    задачи, которые никогда не соединяются. Этот режим может быть более уместным    чем режим локального стека по умолчанию в приложениях, в которых    рабочие потоки обрабатывают только асинхронные задачи в стиле событий.    Для значения по умолчанию используйте false.

Таким образом, рабочая очередь - это фактически lifo:
голова → | t4 | t3 | t2 | t1 |... | < - tail

Таким образом, в фрагментах они fork() выполняют все задачи, нажимая их на стек и join() в том же порядке, то есть от самой глубокой задачи (t1) до вершины (t4), эффективно блокирующей до тех пор, пока какой-нибудь другой поток не украдет (t1), затем (t2) и так далее. Поскольку есть задачи enouth, чтобы блокировать все потоки пулов (task_count → pool.getParallelism()), компенсация срабатывает, как описано Louis Wasserman.