Подтвердить что ты не робот

Есть ли готовый пул потоков с несколькими очередями (которые обеспечивают последовательную обработку каждой очереди)?

Среди всех моих задач у меня есть некоторые, которые должны обрабатываться последовательно (они никогда не могут запускаться одновременно, и их нужно обрабатывать в порядке).

Я достиг этого создания отдельного пула потоков с одним потоком для каждой группы задач, которые должны выполняться последовательно. Он работает, но у меня нет ресурсов для этого. Я не контролирую количество групп, поэтому я мог бы получить нелепое количество потоков, выполняемых одновременно.

Есть ли способ сделать это с помощью пула потоков? Есть ли пул потоков с несколькими очередями блокировки, где я мог бы обеспечить последовательное выполнение для каждой очереди?

EDIT:

Просто подчеркивая то, что я сказал в моем втором абзаце: я решил это с помощью одного потока потока пула для каждой группы задач, которые должны выполняться последовательно. Однако я не могу продолжать это решение. Есть слишком много групп, и я не могу иметь все эти потоки.

Я нашел этот связанный вопрос, но поскольку он не совсем недавно, я все еще создал свою. Все, что я делаю, это попытка избежать изобретательства колеса, но, похоже, у меня нет выбора.

Имеет ли Java поток с несколькими индексами в очереди?

4b9b3361

Ответ 1

Akka, как это предлагает @SotiriosDelimanolis и @AlexeiKaigorodov, кажется многообещающим, а также @Dodd10x второй ответ, который, безусловно, решает проблему. Единственным недостатком является то, что мне пришлось бы закодировать мою собственную стратегию опроса, чтобы убедиться, что мои задачи в конечном итоге добавлены к исполнителю (например, бесконечный цикл в его примере).

С другой стороны, Striped Executor Service, предложенный @OldCurmudgeon, точно соответствует моей проблеме и работает из коробки просто как пользовательский ExecutorService.

Этот магический пул потоков гарантирует, что все Runnables с тем же stripeClass будут выполняться в том порядке, в котором они были отправлены, но StripedRunners с разными stripedClasses могут выполняться независимо. Он хотел использовать относительно небольшой пул потоков для обслуживания большого количества клиентов Java NIO, но таким образом, чтобы исполняемые файлы все равно выполнялись в порядке.

Существует даже комментарий об использовании одного потока потока пула для каждой группы (stripe), как это было предложено здесь:

Было высказано несколько предложений, таких как наличие SingleThreadExecutor для каждого stripeClass. Однако это не удовлетворяет требованию, чтобы мы могли делиться потоками между соединениями.

Я рассматриваю это как лучшее решение для его простоты и простоты использования.

Ответ 2

Если вы поддерживаете очередь для каждой группы, вы можете вытаскивать элементы из каждой очереди и подавать их в пул потоков. В приведенном ниже коде не будет уделяться первоочередное внимание какой-либо одной группе, она просто потянет их за круглую форму. Если вам нужно добавить приоритет, вы должны легко иметь возможность. Следующий код будет объединять 4 группы с использованием двух потоков (плюс поток, управляющий очередью). Вы можете использовать другой механизм очереди. Обычно я использую LinkedBlockingQueue для ситуаций, когда я хочу дождаться, когда элементы будут помещены в очередь другим потоком, что, вероятно, не то, что вы хотите, поэтому я беру опрос вместо вызова take(). Возьмите вызов, который ждет.

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

Если задача для этой группы не завершена, она перейдет к следующей группе. Одновременно будет обрабатываться не более двух групп, и ни одна группа не будет выполнять больше одной задачи. Очереди будут выполнять упорядоченное выполнение.

Ответ 3

Посмотрите на встроенную службу исполнителей потоков Java.

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

Существует один исполнитель потока, который будет обрабатывать каждую задачу синхронно.

В ответ на раздел комментариев:

Пожалуйста, прочитайте API, прежде чем говорить, что это не сработает.
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() Создает Исполнителя, который использует один рабочий поток, работающий с неограниченной очередью. (Обратите внимание, что если этот единственный поток завершается из-за сбоя во время выполнения до выключения, новый, если потребуется, будет занят, чтобы выполнять последующие задачи.) Гарантируется выполнение задач последовательно и не более одной задачи будет активна в любой момент времени. В отличие от эквивалентного newFixedThreadPool (1), возвращенный исполнитель гарантированно не будет перенастраиваться для использования дополнительных потоков.

Примечание: это состояния, которые гарантированно выполняются последовательно.

EDIT:

Теперь, когда я лучше понимаю ваш вопрос, у меня есть идея, которую вы могли бы попробовать. Если вы поддерживаете очередь для каждой группы, вы можете вытаскивать элементы из каждой очереди и подавать их в пул потоков. В приведенном ниже коде не будет уделяться первоочередное внимание какой-либо одной группе, она просто натягивает их на круги своя. Если вам нужно добавить приоритет, вы должны легко иметь возможность. Следующий код будет охватывать четыре группы с использованием двух потоков (плюс поток, управляющий очередью). Вы можете использовать другой механизм очереди. Обычно я использую LinkedBlockingQueue для ситуаций, когда я хочу дождаться, когда элементы будут помещены в очередь другим потоком, что, вероятно, не то, что вы хотите - вот почему я опрос, а не вызов take(). Возьмите вызов, который ждет.

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

Если задача для этой группы не завершена, она перейдет к следующей группе. Одновременно будет обрабатываться не более двух групп, и ни одна группа не будет выполнять больше одной задачи. Очереди будут принудительно исполнены.

Ответ 4

Один исполнитель потока выполнит

ExecutorService  executorService = Executors.newSingleThreadExecutor();

Что внутренне использует ThreadPoolExecutor с LinkedBlockingQueue

new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()))

Итак, вы можете использовать это для своих последовательных файлов и, возможно, использовать службу-исполнитель multi-threaded для одновременных задач

Ответ 5

Недавно я ответил на вопрос о "очереди последовательных задач" с базовой реализацией в качестве демонстрации здесь. Я полагаю, вы использовали аналогичное решение. Сравнительно легко адаптировать реализацию для использования карты списков задач и по-прежнему использовать один (фиксированный размер) исполнитель. "Лучшее решение для вас представляет , но я покажу адаптированную реализацию, чтобы продемонстрировать развязку очереди задач от исполнителя. Реализация использует обратный вызов, и поэтому нет необходимости делать опрос или сигнализацию. Поскольку используется" критический (остановить мир) раздел ", карта с очередями задач может очистить себя: никакие задачи в очереди не означают пустую карту. Недостатком" критического раздела" является то, что пропускная способность ограничена: только столько задач могут быть добавлены и удалены в секунду.

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

// Copied and updated from /questions/416827/editable-queue-of-tasks-running-in-background-thread/1860800#1860800
public class SerialTaskQueues {

    public static void main(String[] args) {

        // test the serial task execution using different groups
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SerialTaskQueues tq = new SerialTaskQueues(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome("1", 30L));
            Thread.sleep(5L);
            tq.add(new SleepSome("2", 20L));
            tq.add(new SleepSome("1", 10L));

            Thread.sleep(100L);
            // all queues should be empty
            System.out.println("Queue size 1: " + tq.size("1")); // should be empty
            System.out.println("Queue size 2: " + tq.size("2")); // should be empty
            tq.add(new SleepSome("1", 10L));
            tq.add(new SleepSome("2", 20L));
            // with executor pool size set to 2, task 3 will have to wait for task 1 to complete
            tq.add(new SleepSome("3", 30L));
            tq.add(new SleepSome("1", 20L));
            tq.add(new SleepSome("2", 10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final Map<String, GroupTasks> taskGroups = new HashMap<>();
    // make lock fair so that adding and removing tasks is balanced.
    private final ReentrantLock lock = new ReentrantLock(true);
    private final ExecutorService executor;

    public SerialTaskQueues(ExecutorService executor) {
        this.executor = executor;
    }

    public boolean add(String groupId, Runnable task) {

        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt == null) {
                gt = new GroupTasks(groupId);
                taskGroups.put(groupId, gt);
            }
            gt.tasks.add(task); 
        } finally {
            lock.unlock();
        }
        runNextTask(groupId);
        return true;
    }

    /* Utility method for testing. */
    public void add(SleepSome sleepTask) {
        add(sleepTask.groupId, sleepTask);
    }

    private void runNextTask(String groupId) {

        // critical section that ensures one task is executed.
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt.tasks.isEmpty()) {
                // only cleanup when last task has executed, prevent memory leak
                if (!gt.taskRunning.get()) {
                    taskGroups.remove(groupId);
                }
            } else if (!executor.isShutdown() && gt.taskRunning.compareAndSet(false, true)) {
                executor.execute(wrapTask(groupId, gt.taskRunning, gt.tasks.remove(0)));
            }
        } finally {
            lock.unlock();
        }
    }

    private CallbackTask wrapTask(final String groupId, final AtomicBoolean taskRunning, Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override 
            public void run() {
                if (!taskRunning.compareAndSet(true, false)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask(groupId);
            }
        });
    }

    /** Amount of (active) task groups. */
    public int size() {

        int size = 0;
        lock.lock();
        try {
            size = taskGroups.size();
        } finally {
            lock.unlock();
        }
        return size;
    }

    public int size(String groupId) {

        int size = 0;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            size = (gt == null ? 0 : gt.tasks.size());
        } finally {
            lock.unlock();
        }
        return size;
    }

    public Runnable get(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r =  (gt == null ? null : gt.tasks.get(index));
        } finally {
            lock.unlock();
        }
        return r;
    }

    public Runnable remove(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r = gt.tasks.remove(index);
            // similar to runNextTask - cleanup if there are no tasks (running) for the group 
            if (gt.tasks.isEmpty() && !gt.taskRunning.get()) {
                taskGroups.remove(groupId);
            }
        } finally {
            lock.unlock();
        }
        return r;
    }

    /* Helper class for the task-group map. */
    class GroupTasks {

        final List<Runnable> tasks = new LinkedList<Runnable>();
        // atomic boolean used to ensure only 1 task is executed at any given time
        final AtomicBoolean taskRunning = new AtomicBoolean(false);
        final String groupId;

        GroupTasks(String groupId) {
            this.groupId = groupId;
        }
    }

    // general callback-task, see https://stackoverflow.com/a/826283/3080094
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override 
        public void run() {

            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final String groupId;
        private final long sleepTimeMs;
        public SleepSome(String groupId, long sleepTimeMs) {
            this.groupId = groupId;
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta(groupId) + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta(groupId) + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta(String groupId) { return String.format("% 4d [%s] ", (System.currentTimeMillis() - startTime), groupId); }
    }
}

Ответ 6

Что вам нужно, это не специальный исполнитель, а способ выражения зависимостей между задачами. Вместо группы задач, которые должны выполняться последовательно, подумайте о задаче, которая в конце выполнения отправляет сигнал следующей задаче, тем самым начиная ее выполнение. Таким образом, ваша задача может быть закодирована как актер, который ждет разрешения запуска сигнала. Рассмотрим Akka или любую другую актерскую библиотеку (например, my df4j).