Подтвердить что ты не робот

Как можно использовать threadpool после завершения работы

У меня есть .csv файл, содержащий более 70 миллионов строк, из которых каждая строка должна генерировать Runnable, а затем выполняется threadpool. Этот Runnable будет вставлять запись в Mysql.

Что еще, я хочу записать позицию файла csv для RandomAccessFile, чтобы найти. Позиция написана на Файл. Я хочу записать эту запись, когда все потоки в threadpool закончены. Вызывается ThreadPoolExecutor.shutdown(). Но когда приходит больше строк, мне снова нужен поток. Как я могу повторно использовать этот текущий поток, вместо того, чтобы создавать новый.

Код выглядит следующим образом:

public static boolean processPage() throws Exception {

    long pos = getPosition();
    long start = System.currentTimeMillis();
    raf.seek(pos);
    if(pos==0)
        raf.readLine();
    for (int i = 0; i < PAGESIZE; i++) {
        String lineStr = raf.readLine();
        if (lineStr == null)
            return false;
        String[] line = lineStr.split(",");
        final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
        //System.out.println("userId: "+log.getUserId()%512);

        pool.execute(new Runnable(){
            public void run(){
                try {
                    experienceService.insertExperienceLog(log);
                } catch (BaseException e) {
                    e.printStackTrace();
                }
            }
        });

        long end = System.currentTimeMillis();
    }

    BufferedWriter resultWriter = new BufferedWriter(
            new OutputStreamWriter(new FileOutputStream(new File(
                    RESULT_FILENAME), true)));
    resultWriter.write("\n");
    resultWriter.write(String.valueOf(raf.getFilePointer()));
    resultWriter.close();
    long time = System.currentTimeMillis()-start;
    System.out.println(time);
    return true;
}

Спасибо!

4b9b3361

Ответ 1

Как указано в документации , вы не можете повторно использовать ExecutorService, который был отключен. Я бы рекомендовал против любых обходных решений, поскольку (а) они могут работать не так, как ожидалось, во всех ситуациях; и (б) вы можете достичь того, что хотите, используя стандартные классы.

Вы должны либо

  • создать новый ExecutorService; или

  • не завершать ExecutorService.

Первое решение легко реализуется, поэтому я не буду подробно его описывать.

Для второго, так как вы хотите выполнить действие, как только все поставленные задачи будут завершены, вы можете взглянуть на ExecutorCompletionService и использовать его вместо этого. Он обертывает ExecutorService, который будет управлять потоком, но runnables будут обернуты во что-то, что сообщит ExecutorCompletionService, когда они закончат, чтобы он мог сообщить вам:

ExecutorService executor = ...;
ExecutorCompletionService ecs = new ExecutorCompletionService(executor);

for (int i = 0; i < totalTasks; i++) {
  ... ecs.submit(...); ...
}

for (int i = 0; i < totalTasks; i++) {
  ecs.take();
}

Метод take() в классе ExecutorCompletionService будет блокироваться до завершения задачи (как обычно, так и внезапно). Он вернет a Future, поэтому вы можете проверить результаты, если хотите.

Надеюсь, это может вам помочь, так как я не совсем понял вашу проблему.

Ответ 2

создать и сгруппировать все задачи и отправить их в пул с invokeAll (который возвращается только после успешного завершения всех задач)

Ответ 3

После вызова shutdown в ExecutorService новая задача не будет принята. Это необходимо для создания нового ExecutorService для каждого раунда задач.

Однако, с Java 8 ForkJoinPool.awaitQuiescence. Если вы можете переключиться с обычного ExecutorService на ForkJoinPool, вы можете использовать этот метод, чтобы ждать, пока в ForkJoinPool не будет запущено больше задач без необходимости выключения. Это означает, что вы можете заполнить ForkJoinPool с помощью задач, ожидая, пока он не станет пустым (quiescent), а затем снова начните заполнять его с помощью Задачи и т.д.