Подтвердить что ты не робот

Как реализовать приоритезацию задач с помощью ExecutorService в Java 5?

Я реализую механизм объединения потоков, в котором я хотел бы выполнять задачи с разными приоритетами. Я хотел бы иметь хороший механизм, благодаря которому я могу отправить высокоприоритетную задачу службе и планировать ее перед другими задачами. Приоритет задачи - это внутреннее свойство самой задачи (я говорю, что эта задача как Callable или Runnable для меня не важна).

Теперь, внешне, похоже, что я мог использовать PriorityBlockingQueue в качестве очереди задач в моей ThreadPoolExecutor, но эта очередь содержит объекты Runnable, которые могут быть или не быть заданиями Runnable, которые я отправил к нему. Более того, если я отправил задачи Callable, это не ясно, как это когда-либо будет отображаться.

Есть ли способ сделать это? Я бы предпочел не отказываться от этого, потому что я гораздо чаще ошибаюсь.

(В стороне, да, я знаю о возможности голодания для низкоприоритетных работ в чем-то подобном. Дополнительные баллы (?!) для решений, которые имеют разумную гарантию справедливости)

4b9b3361

Ответ 1

Поначалу румянец, казалось бы, вы могли бы определить интерфейс для своих задач, который расширяет Runnable или Callable<T> и Comparable. Затем заверните ThreadPoolExecutor с PriorityBlockingQueue в качестве очереди и принимайте только те задачи, которые реализуют ваш интерфейс.

Принимая во внимание ваш комментарий, он выглядит как один из вариантов расширения ThreadPoolExecutor и переопределения методов submit(). Обратитесь к AbstractExecutorService, чтобы узнать, как выглядят по умолчанию; все, что они делают, это обернуть Runnable или Callable в FutureTask и execute(). Я бы это сделал, написав класс-оболочку, который реализует ExecutorService и делегирует анонимный внутренний ThreadPoolExecutor. Оберните их тем, что имеет ваш приоритет, чтобы ваш Comparator мог получить его.

Ответ 2

Я решил эту проблему разумным образом, и я опишу ее ниже для будущей ссылки на меня и на всех, кто сталкивается с этой проблемой с библиотеками Java Concurrent.

Использование PriorityBlockingQueue в качестве средства для выполнения задач для последующего выполнения - это действительно движение в правильном направлении. Проблема заключается в том, что PriorityBlockingQueue должен быть в общем создан экземпляром Runnable экземпляров, и невозможно вызвать compareTo (или аналогичный) в интерфейсе Runnable.

На решение проблемы. При создании Исполнителя необходимо предоставить PriorityBlockingQueue. Очередь также должна быть предоставлена ​​настраиваемому Компаратору для правильной сортировки по месту:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Теперь загляните в CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Все выглядит довольно прямо вперед до этого момента. Здесь он немного липкий. Наша следующая проблема заключается в создании FutureTasks от Исполнителя. В Executor мы должны переопределить newTaskFor так:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Где c - задача Callable, которую мы пытаемся выполнить. Теперь давайте посмотрим на CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Обратите внимание на метод getTask. Мы собираемся использовать это позже, чтобы захватить исходную задачу из этого CustomFutureTask, который мы создали.

И, наконец, измените исходную задачу, которую мы пытаемся выполнить:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Вы можете видеть, что мы реализуем Comparable в задаче для делегирования фактическому Comparator для MyType.

И у вас есть это, настроенная приоритизация для Executor с использованием библиотек Java! Это требует немного гибки, но это самое чистое, что я смог придумать. Надеюсь, это кому-то поможет!

Ответ 3

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

И

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

И, тогда используйте его следующим образом:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Ответ 4

Я попытаюсь объяснить эту проблему полностью функциональным кодом. Но прежде чем погрузиться в код, я хотел бы рассказать об PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue - это реализация BlockingQueue. Он принимает задачи вместе с их приоритетом и сначала отправляет задачу с наивысшим приоритетом для выполнения. Если какие-либо две задачи имеют один и тот же приоритет, нам нужно предоставить некоторую пользовательскую логику, чтобы решить, какая задача будет первой.

Теперь давайте сразу перейдем в код.

Класс драйвера. Этот класс создает исполнителя, который принимает задачи и затем отправляет их на исполнение. Здесь мы создаем две задачи с приоритетом LOW, а другая с высоким приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потока и использовать PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

Класс MyTask: MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача выполняется, она печатает сообщение, а затем помещает поток в режим ожидания в течение 1 секунды.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Класс MyFutureTask. Поскольку мы используем PriorityBlocingQueue для выполнения наших задач, наши задачи должны быть завернуты в FutureTask, и наша реализация FutureTask должна реализовать интерфейс Comparable. Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет задачу с наивысшим приоритетом для выполнения.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Класс приоритета: Самоочевидный класс приоритета.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Теперь, когда мы запускаем этот пример, мы получаем следующий вывод

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Несмотря на то, что мы сначала отправили приоритет LOW, но HIGH приоритет, но поскольку мы используем PriorityBlockingQueue, любая задача с более высоким приоритетом будет выполняться первой.

Ответ 5

Мое решение сохраняет порядок подчинения задач для одних и тех же приоритетов. Это улучшение этого ответа

Порядок выполнения задач основан на:

  • Priority
  • Отправить заказ (в пределах того же приоритета)

Класс тестера:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Результат:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

Первая задача - это A1, потому что не было более высокого приоритета в очереди, когда она была вставлена. B - один приоритет, выполняемый ранее, задачи - это приоритет 0, который затем выполняется позже, но порядок выполнения - порядок подчинения: B1, B2, B3,... A2, A3, A4...

Решение:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

Ответ 6

Возможно ли иметь один ThreadPoolExecutor для каждого уровня приоритета? A ThreadPoolExecutor может быть инициирован с помощью ThreadFactory, и вы можете иметь собственную реализацию ThreadFactory, чтобы установить различные уровни приоритета.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }