Я отправил кучу заданий в службу исполнителей на Java, и я как-то хочу временно приостановить все эти задания. Какой лучший способ сделать это? Как я могу возобновить? Или я делаю это совершенно неправильно? Должен ли я следовать другим шаблонам для того, чего я хочу достичь (т.е. Возможность приостанавливать/возобновлять службы исполнения)?
Как приостановить/возобновить все потоки в ExecutorService в Java?
Ответ 1
Для того, чтобы ответить на мой собственный вопрос, я нашел пример PausableThreadPoolExecutor
в Javadocs из ThreadPoolExecutor
самого. Вот моя версия с использованием мониторов Guava:
import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class PausableExecutor extends ScheduledThreadPoolExecutor {
private boolean isPaused;
private final Monitor monitor = new Monitor();
private final Monitor.Guard paused = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return isPaused;
}
};
private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return !isPaused;
}
};
public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
monitor.enterWhenUninterruptibly(notPaused);
try {
monitor.waitForUninterruptibly(notPaused);
} finally {
monitor.leave();
}
}
public void pause() {
monitor.enterIf(notPaused);
try {
isPaused = true;
} finally {
monitor.leave();
}
}
public void resume() {
monitor.enterIf(paused);
try {
isPaused = false;
} finally {
monitor.leave();
}
}
}
Ответ 2
Я сделал несколько критических замечаний по поводу вашего принятого ответа, но они не были очень конструктивными... Так что вот мое решение. Я хотел бы использовать такой класс, как этот, а затем вызывать checkIn
везде, где и когда я захочу сделать паузу. Найдите это на GitHub !
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Provides a mechanism to pause multiple threads.
* If wish your thread to participate, then it must regularly check in with an instance of this object.
*
* @author Corin Lawson <[email protected]>
*/
public class Continue {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public void checkIn() throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} finally {
pauseLock.unlock();
}
}
}
public void checkInUntil(Date deadline) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUntil(deadline);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long nanosTimeout) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitNanos(nanosTimeout);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long time, TimeUnit unit) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await(time, unit);
} finally {
pauseLock.unlock();
}
}
}
public void checkInUninterruptibly() {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUninterruptibly();
} finally {
pauseLock.unlock();
}
}
}
public boolean isPaused() {
return isPaused;
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
if (isPaused) {
isPaused = false;
unpaused.signalAll();
}
} finally {
pauseLock.unlock();
}
}
}
Например:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class PausableExecutor extends ScheduledThreadPoolExecutor {
private Continue cont;
public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
super(corePoolSize, threadFactory);
cont = c;
}
protected void beforeExecute(Thread t, Runnable r) {
cont.checkIn();
super.beforeExecute(t, r);
}
}
Это дает дополнительное преимущество, заключающееся в том, что вы можете приостановить многие потоки одним вызовом " Continue
pause
.
Ответ 3
Я искал функциональность паузы/возобновления в исполнителе, но с дополнительной возможностью ждать каких-либо обрабатываемых в настоящее время задач. Ниже представлен вариант других больших реализаций из этого SO с добавлением функций ожидания. Я тестировал его на исполнителе с помощью одного потока. Таким образом, базовое использование:
executor.pause();
executor.await(10000); // blocks till current tasks processing ends
код класса:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private Latch activeTasksLatch = new Latch();
private class Latch {
private final Object synchObj = new Object();
private int count;
public boolean awaitZero(long waitMS) throws InterruptedException {
long startTime = System.currentTimeMillis();
synchronized (synchObj) {
while (count > 0) {
if ( waitMS != 0) {
synchObj.wait(waitMS);
long curTime = System.currentTimeMillis();
if ( (curTime - startTime) > waitMS ) {
return count <= 0;
}
}
else
synchObj.wait();
}
return count <= 0;
}
}
public void countDown() {
synchronized (synchObj) {
if (--count <= 0) {
// assert count >= 0;
synchObj.notifyAll();
}
}
}
public void countUp() {
synchronized (synchObj) {
count++;
}
}
}
/**
* Default constructor for a simple fixed threadpool
*/
public PausableScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
/**
* Executed before a task is assigned to a thread.
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
activeTasksLatch.countUp();
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
super.afterExecute(r, t);
}
finally {
activeTasksLatch.countDown();
}
}
/**
* Pause the threadpool. Running tasks will continue running, but new tasks
* will not start untill the threadpool is resumed.
*/
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
/**
* Wait for all active tasks to end.
*/
public boolean await(long timeoutMS) {
// assert isPaused;
try {
return activeTasksLatch.awaitZero(timeoutMS);
} catch (InterruptedException e) {
// log e, or rethrow maybe
}
return false;
}
/**
* Resume the threadpool.
*/
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
Ответ 4
Проблема в том, что сами Runnable/Callable должны проверять, когда нужно приостановить/возобновить. Это говорит о том, что есть много способов сделать это, и это зависит от ваших требований относительно того, как это сделать. Независимо от вашего решения вам необходимо сделать прерывание ожидания, поэтому поток может быть отключен.
Ответ 5
Я знаю, что это старый, но я пробовал все эти ответы, и ни один из них не работал для того, что я пытался сделать с паузой таймера; все они выбросили бы все данные, которые он делал бы по расписанию, как только он возобновился (все сразу).
Вместо этого я нашел этот класс Timer
на GitHub * здесь. Это сработало очень хорошо для меня.
* Я не писал этот код, просто нашел его.