Подтвердить что ты не робот

Как остановить Runnable, запланированное на повторное выполнение после определенного количества исполнений

Ситуация

У меня есть Runnable. У меня есть класс, который планирует этот Runnable для выполнения с помощью ScheduledExecutorService с scheduleWithFixedDelay.

Цель

Я хочу изменить этот класс, чтобы запланировать выполнение Runnable для фиксированной задержки либо на неопределенное время, либо до тех пор, пока оно не будет запущено определенное количество раз, в зависимости от какого-либо параметра, который передается конструктору.

Если возможно, я хотел бы использовать тот же Runnable, что и концептуально то же самое, что должно быть "run".

Возможные подходы

Подход № 1

У вас есть две Runnables, одна из которых отменяет расписание после нескольких исполнений (в которых она хранится), а другая:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

Я предпочел бы просто запустить Runnable для выполнения метода doSomething. Привязывание планирования к Runnable кажется неправильным. Что вы думаете об этом?

Подход № 2

У вас есть один Runnable для выполнения кода, который мы хотим запустить периодически. Имеет отдельную запланированную runnable, которая проверяет, сколько раз первый Runnable запускал и отменяет, когда он достигает определенной суммы. Это может быть неточным, так как оно будет асинхронным. Это кажется немного громоздким. Что вы думаете об этом?

Подход № 3

Расширьте ScheduledExecutorService и добавьте метод "scheduleWithFixedDelayNTimes". Может быть, такой класс уже существует? В настоящее время я использую Executors.newSingleThreadScheduledExecutor(); для получения экземпляра ScheduledExecutorService. Я предположительно должен был бы реализовать аналогичные функции для создания экземпляра расширенного ScheduledExecutorService. Это может быть сложно. Что вы думаете об этом?

Нет подхода к планировщику [Изменить]

Я не мог использовать планировщик. Вместо этого я мог бы иметь что-то вроде:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

И запустите это в каком-то потоке. Что вы думаете об этом? Вы потенциально можете использовать runnable и напрямую вызвать метод run.


Любые предложения приветствуются. Я ищу дебаты, чтобы найти способ "наилучшей практики" для достижения моей цели.

4b9b3361

Ответ 1

Вы можете использовать метод cancel() для Future. Из javadocs scheduleAtFixedRate

Otherwise, the task will only terminate via cancellation or termination of the executor

Вот примерный код, который обертывает Runnable в другой, который отслеживает количество запусков оригинала и отменяет его после N раз.

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}

Ответ 2

Цитируется из описания API ( ScheduledExecutorService.scheduleWithFixedDelay):

Создает и выполняет периодическое действие, которое становится активным в первую очередь после заданной начальной задержки, а затем с заданной задержкой между прекращением одного выполнения и началом следующего. Если какое-либо выполнение задачи встречает исключение, последующие выполнения подавляются. В противном случае задание будет прекращено только путем отмены или прекращения исполнителя.

Итак, проще всего было бы "просто выбросить исключение" (даже если это считается плохой практикой):

static class MyTask implements Runnable {

    private int runs = 0;

    @Override
    public void run() {
        System.out.println(runs);
        if (++runs >= 20)
            throw new RuntimeException();
    }
}

public static void main(String[] args) {
    ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
    s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS);
}

Ответ 3

Решение sbridges до сих пор является самым чистым, за исключением того, что вы упомянули, что оно не отвечает за обработку количества исполнений самому Runnable. Это не должно касаться этого, вместо этого повторения должны быть параметром класса, обрабатывающего планирование. Чтобы достичь этого, я бы предложил следующий дизайн, который вводит новый класс исполнителя для Runnables. Класс предоставляет два общедоступных метода планирования задач, которые являются стандартными Runnables, с конечным или бесконечным повторением. Тот же Runnable может быть передан для конечного и бесконечного планирования, если это необходимо (что невозможно при всех предлагаемых решениях, расширяющих класс Runnable для обеспечения конечных повторений). Обработка отмены конечных повторений полностью инкапсулируется в классе планировщика:

class MaxNScheduler
{

  public enum ScheduleType 
  {
     FixedRate, FixedDelay
  }

  private ScheduledExecutorService executorService =
     Executors.newSingleThreadScheduledExecutor();

  public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, 
    long initialDelay, long period, TimeUnit unit)
  {
    return scheduleNTimes(task, -1, type, initialDelay, period, unit);
  }

  /** schedule with count repetitions */
  public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, 
    ScheduleType type, long initialDelay, long period, TimeUnit unit) 
  {
    RunnableWrapper wrapper = new RunnableWrapper(task, repetitions);
    ScheduledFuture<?> future;
    if(type == ScheduleType.FixedDelay)
      future = executorService.scheduleWithFixedDelay(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    else
      future = executorService.scheduleAtFixedRate(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    synchronized(wrapper)
    {
       wrapper.self = future;
       wrapper.notify(); // notify wrapper that it nows about it future (pun intended)
    }
    return future;
  }

  private static class RunnableWrapper implements Runnable 
  {
    private final Runnable realRunnable;
    private int repetitions = -1;
    ScheduledFuture<?> self = null;

    RunnableWrapper(Runnable realRunnable, int repetitions) 
    {
      this.realRunnable = realRunnable;
      this.repetitions = repetitions;
    }

    private boolean isInfinite() { return repetitions < 0; }
    private boolean isFinished() { return repetitions == 0; }

    @Override
    public void run()
    {
      if(!isFinished()) // guard for calls to run when it should be cancelled already
      {
        realRunnable.run();

        if(!isInfinite())
        {
          repetitions--;
          if(isFinished())
          {
            synchronized(this) // need to wait until self is actually set
            {
              if(self == null)
              {
                 try { wait(); } catch(Exception e) { /* should not happen... */ }
              }
              self.cancel(false); // cancel gracefully (not throwing InterruptedException)
            }
          }
        }
      }
    }
  }

}

Справедливости ради следует, что логика управления повторениями по-прежнему существует с Runnable, но она является Runnable полностью внутренней для MaxNScheduler, тогда как задача Runnable, переданная для планирования, не должна касаться сам с характером планирования. Также эта проблема может быть легко перенесена в планировщик, если это необходимо, путем предоставления некоторого обратного вызова каждый раз, когда выполнялся RunnableWrapper.run. Это немного усложнит код и добавит необходимость сохранить некоторую карту RunnableWrapper и соответствующие повторения, поэтому я решил сохранить счетчики в классе RunnableWrapper.

Я также добавил некоторую синхронизацию на обертку при настройке self. Это необходимо как теоретически, когда завершение исполнения, я, возможно, еще не был назначен (теоретический сценарий, но только для одного повторения).

Отмена обрабатывается изящно, не выкидывая InterruptedException, и в случае, если до ее отмены выполнено очередное раунд, RunnableWrapper не будет вызывать базовый Runnable.

Ответ 4

Ваш первый подход кажется ОК. Вы могли бы объединить оба типа runnables, передав объект mode его конструктору (или передав -1 как максимальное количество раз, когда он должен выполняться), и используйте этот режим, чтобы определить, нужно ли отменять runnable или нет:

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

Вам нужно будет передать запланированное будущее вашей задаче, чтобы она могла отменить себя, или вы можете исключить исключение.

Ответ 5

Вот мое предложение (я считаю, что он обрабатывает все случаи, упомянутые в вопросе):

public class RepeatedScheduled implements Runnable {

    private int repeatCounter = -1;
    private boolean infinite;

    private ScheduledExecutorService ses;
    private long initialDelay;
    private long delay;
    private TimeUnit unit;

    private final Runnable command;
    private Future<?> control;

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit) {

        this.ses = ses;
        this.initialDelay = initialDelay;
        this.delay = delay;
        this.unit = unit;

        this.command = command;
        this.infinite = true;

    }

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit, int maxExecutions) {

        this(ses, command, initialDelay, delay, unit);
        this.repeatCounter = maxExecutions;
        this.infinite = false;

    }

    public Future<?> submit() {

        // We submit this, not the received command
        this.control = this.ses.scheduleWithFixedDelay(this,
            this.initialDelay, this.delay, this.unit);

        return this.control;

    }

    @Override
    public synchronized void run() {

        if ( !this.infinite ) {
            if ( this.repeatCounter > 0 ) {
                this.command.run();
                this.repeatCounter--;
            } else {
                this.control.cancel(false);
            }
        } else {
            this.command.run();
        }

    }

}

Кроме того, он позволяет внешней стороне останавливать все из Future, возвращаемого методом submit().

Использование:

Runnable MyRunnable = ...;
// Repeat 20 times
RepeatedScheduled rs = new RepeatedScheduled(
    MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20);
Future<?> MyControl = rs.submit();
...

Ответ 6

Я искал точно org.springframework.scheduling.Trigger же функциональность и выбрал org.springframework.scheduling.Trigger.

Ниже приведен пример полного тестирования (извините, если в коде слишком много данных) applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>

ДЖАВА

package com.alz.springTests.schedulerTest;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ScheduledTest {

    private static ApplicationContext applicationContext;
    private static TaskScheduler taskScheduler;

    private static final class SelfCancelableTask implements Runnable, Trigger {
        Date creationTime = new Date();
        AtomicInteger counter = new AtomicInteger(0);
        private volatile boolean shouldStop = false;
        private int repeatInterval = 3; //seconds

        @Override
        public void run() {
            log("task: run started");

            // simulate "doing job" started
            int sleepTimeMs = ThreadLocalRandom.current().nextInt(500, 2000+1);
            log("will sleep " + sleepTimeMs + " ms");
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // "doing job" finished

            int i = counter.incrementAndGet();
            if (i > 5) { //cancel myself
                logErr("Attempts exceeded, will mark as shouldStop");
                shouldStop = true;

            } else {
                log("task: executing cycle #"+i);
            }
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            log("nextExecutionTime: triggerContext.lastActualExecutionTime() " + triggerContext.lastActualExecutionTime());
            log("nextExecutionTime: triggerContext.lastCompletionTime() " + triggerContext.lastCompletionTime());
            log("nextExecutionTime: triggerContext.lastScheduledExecutionTime() " + triggerContext.lastScheduledExecutionTime());

            if (shouldStop) 
                return null;

            if (triggerContext.lastCompletionTime() == null) {
                LocalDateTime ldt = creationTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());
            } else {
                LocalDateTime ldt = triggerContext.lastCompletionTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());               
            }

        }

    }

    private static void log(String log) {
        System.out.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    private static void logErr(String log) {
        System.err.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    public static void main(String[] args) {

        log("main: Stated...");

        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

        taskScheduler = (TaskScheduler) applicationContext.getBean("blockingTasksScheduler");

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ((ThreadPoolTaskScheduler)taskScheduler).getScheduledThreadPoolExecutor();

        SelfCancelableTask selfCancelableTask = new SelfCancelableTask();
        taskScheduler.schedule(selfCancelableTask, selfCancelableTask);


        int waitAttempts = 0;
        while (waitAttempts < 30) {
            log("scheduledPool pending tasks: " + scheduledThreadPoolExecutor.getQueue().size());

            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            waitAttempts++;

        }

        log("main: Done!");


    }

}

Ответ 7

Для вариантов использования, таких как опрос до определенного времени ожидания, мы можем подойти с более простым решением, используя Future.get().

/* Define task */
public class Poll implements Runnable {
    @Override
    public void run() {
        // Polling logic
    }
}

/* Create executor service */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/* Schedule task - poll every 500ms */
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS);

/* Wait till 60 sec timeout */
try {
    future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    scheduledFuture.cancel(false);
    // Take action on timeout
}