Подтвердить что ты не робот

Используя Spring @Scheduled и @Async вместе

Вот мой прецедент.

Устаревшая система обновляет таблицу очередей базы данных QUEUE.

Мне нужна запланированная повторяющаяся работа, которая - проверяет содержимое QUEUE - если в таблице есть строки, они блокируют строку и выполняют некоторую работу - удаляет строку в QUEUE

Если предыдущее задание все еще запущено, то для выполнения этой работы будет создан новый поток. Я хочу настроить максимальное количество одновременных потоков.

Я использую Spring 3, и мое текущее решение состоит в том, чтобы сделать следующее (используя fixedRate 1 миллисекунды, чтобы потоки выполнялись в основном непрерывно)

@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
    log.debug("Start schedule");
    publishWorker.start();
    log.debug("End schedule");
}

<task:executor id="workerExecutor" pool-size="4" />

Это создало 4 потока прямо, и потоки правильно распределяли рабочую нагрузку из очереди. Однако я, кажется, получаю утечку памяти, когда потоки занимают много времени.

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0                              |              80 |   373,410,496 |     89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940                          |           48 |   373,410,136 |     89.74%
|  |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68  

Итак,

1: Должен ли я использовать @Async и @Scheduled вместе?

2: Если нет, то как еще я могу использовать Spring для выполнения моих требований?

3: Как создать новые потоки только тогда, когда другие потоки заняты?

Спасибо всем!

EDIT: Я думаю, что очередь заданий становилась бесконечно длинной... Теперь, используя

    <task:executor id="workerExecutor"
    pool-size="1-4"
    queue-capacity="10" rejection-policy="DISCARD" />

Отчитается с результатами

4b9b3361

Ответ 1

Вы можете попробовать

  • Запустите планировщик с задержкой в ​​одну секунду, которая будет блокировать и извлекать все Записи QUEUE, которые до сих пор не заблокированы.
  • Для каждой записи вызовите метод Async, который обработает эту запись и удалит ее.
  • Политика отклонения исполнителя должна быть ABORT, так что планировщик может разблокировать QUEUE, которые еще не выданы для обработки. Таким образом, планировщик может снова обработать эти QUEUE в следующем прогоне.

Конечно, вам придется обрабатывать сценарий, когда планировщик заблокировал QUEUE, но обработчик не закончил обработку по какой-либо причине.

Псевдокод:

public class QueueScheduler {
    @AutoWired
    private QueueHandler queueHandler;

    @Scheduled(fixedDelay = 1000)
    public void doSchedule() throws InterruptedException {
        log.debug("Start schedule");
        List<Long> queueIds = lockAndFetchAllUnlockedQueues();
        for (long id : queueIds)
            queueHandler.process(id);
        log.debug("End schedule");
    }
}

public class QueueHandler {

    @Async
    public void process(long queueId) {
        // process the QUEUE & delete it from DB
    }
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
     rejection-policy="ABORT"/>

Ответ 2

//using a fixedRate of 1 millisecond to get the threads to run basically continuously
@Scheduled(fixedRate = 1)

Когда вы используете @Scheduled, будет создан новый поток и вызовет метод doSchedule в указанном fixedRate с 1 миллисекундой. При запуске приложения вы уже можете увидеть 4 потока, конкурирующих за таблицу QUEUE и, возможно, мертвую блокировку.

Исследуйте, есть ли тупик, принимая дамп потока. http://helpx.adobe.com/cq/kb/TakeThreadDump.html

Аннотации @Async здесь не будут использоваться.

Лучший способ реализовать это - создать класс как поток, выполнив runnable и передав ваш класс в TaskExecutor с нужным количеством потоков.

Используя Spring threading и TaskExecutor, как узнать, когда поток завершен?

Также проверьте свой дизайн, он, похоже, не правильно обрабатывает синхронизацию. Если предыдущее задание выполняется и удерживает блокировку в строке, следующее задание, которое вы создаете, будет по-прежнему видеть эту строку и будет ожидать получения блокировки в этой конкретной строке.