Подтвердить что ты не робот

Как реализовать PriorityBlockingQueue с помощью ThreadPoolExecutor и настраиваемых задач

Я много искал, но не смог найти решение своей проблемы.

У меня есть свой собственный класс, BaseTask, который использует ThreadPoolExecutor для обработки задач. Я хочу приоритезации задач, но когда я пытаюсь использовать PriorityBlockingQueue я получаю ClassCastException потому что ThreadPoolExecutor оборачивает мои задачи в объект FutureTask.

Это, очевидно, имеет смысл, потому что FutureTask не реализует Comparable, но как бы я идти, чтобы решить эту проблему приоритета? Я читал, что вы можете переопределить newTaskFor() в ThreadPoolExecutor, но я не могу найти этот метод вообще...?

Любые предложения будут высоко ценится!

Некоторый код, чтобы помочь:

В моем классе BaseTask меня есть

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

В классе BaseFutureTask

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

В классе BaseThreadPoolExecutor я переопределяю 3 метода submit... BaseThreadPoolExecutor конструктор в этом классе, но ни один из методов submit

4b9b3361

Ответ 1

public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

как вы можете догадаться, RunWithPriority - это абстрактный класс, который является Runnable и имеет поле приоритета Integer

Ответ 2

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

И

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

И, тогда используйте его следующим образом:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Ответ 3

Мое решение:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}

Ответ 4

Я попытаюсь объяснить эту проблему полностью функциональным кодом. Но прежде чем погрузиться в код, я хотел бы рассказать об PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue - это реализация BlockingQueue. Он принимает задачи вместе с их приоритетом и сначала отправляет задачу с наивысшим приоритетом для выполнения. Если какие-либо две задачи имеют один и тот же приоритет, нам нужно предоставить некоторую пользовательскую логику, чтобы решить, какая задача будет первой.

Теперь давайте сразу перейдем в код.

Класс драйвера. Этот класс создает исполнителя, который принимает задачи и затем отправляет их на исполнение. Здесь мы создаем две задачи с приоритетом LOW, а другая с высоким приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потока и использовать PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

Класс MyTask: MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача выполняется, она печатает сообщение, а затем помещает поток в режим ожидания в течение 1 секунды.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Класс MyFutureTask. Поскольку мы используем PriorityBlocingQueue для выполнения наших задач, наши задачи должны быть завернуты в FutureTask, и наша реализация FutureTask должна реализовать интерфейс Comparable. Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет задачу с наивысшим приоритетом для выполнения.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Класс приоритета: Самоочевидный класс приоритета.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Теперь, когда мы запускаем этот пример, мы получаем следующий вывод

The following Runnable is getting executed High
The following Runnable is getting executed Low

Несмотря на то, что мы сначала перенесли приоритет LOW, но HIGH приоритет, но поскольку мы используем PriorityBlockingQueue, задача с более высоким приоритетом будет выполняться первой.

Ответ 5

Похоже, что они оставили это из гармонии апача. Существует svn commit log около года назад, фиксируя отсутствие newTaskFor. Вероятно, вы можете просто переопределить функции submit в расширенном ThreadPoolExecutor, чтобы создать расширенный FutureTask, который Comparable. Они не очень длинны.

Ответ 6

Чтобы ответить на ваш вопрос: метод newTaskFor() находится в ThreadPoolExecutor суперклассе, AbstractExecutorService. Однако вы можете просто переопределить его в ThreadPoolExecutor.

Ответ 7

Этот ответ является упрощенной версией ответа @StanislavVitvitskyy. Спасибо ему.

Я хотел сделать Comparable задания, которые я представил. Я создал ExecutorService с PriorityBlockingQueue и расширил его для обработки newTaskFor(...):

ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
    keepAliveTime, timeUnit, new PriorityBlockingQueue<Runnable>()) {

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new ComparableFutureTask<T>(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new ComparableFutureTask<T>(callable);
    };
};

Я определил ComparableFutureTask, который проходит FutureTask и реализует Comparable делегируя к job.compareTo(...), которые представляются в бассейн.

public class ComparableFutureTask<T> extends FutureTask<T>
    implements Comparable<Object> {

    private final Comparable<Object> comparableJob;

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Runnable runnable, T value) {
        super(runnable, value);
        this.comparableJob = (Comparable<Object>) runnable;
    }

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Callable<T> callable) {
        super(callable);
        this.comparableJob = (Comparable<Object>) callable;
    }

    @Override
    public int compareTo(Object o) {
        return this.comparableJob
            .compareTo(((ComparableFutureTask<?>) o).comparable);
    }
}

Это ExecutorService затем может обрабатывать Runnable или Callable рабочие места, которые также Comparable. Например:

public class MyJob implements Runnable, Comparable<MyJob> {
    private int priority;
    ...
    @Override
    public int compareTo(MyJob other) {
        // we want higher priority to go first
        return other.priority - this.priority;
    }
    ...
}

Важно отметить, что если вы отправите задание, которое не Comparable с этой очередью, оно вызовет ClassCastException.