Подтвердить что ты не робот

Используя Spring threading и TaskExecutor, как узнать, когда поток завершен?

Хорошо, возможно наивный вопрос здесь. У меня есть служба, которая должна регистрироваться на нескольких сетевых устройствах, запускать команду для каждого и собирать результаты. Для скорости, а не для сбора информации на каждом устройстве в последовательности, мне нужно получить доступ к ним все одновременно и использовать результаты после их завершения.

Используя структуру Spring и Jsch, я могу легко запросить каждое устройство правильно. Когда я сталкиваюсь с некоторой путаницей, я пытаюсь переделать beans для использования TaskExecutor для этого. Что я не могу понять, как это сделать, так это узнать, когда поток завершен.

Что я до сих пор знаю:

public class RemoteCommand {

    private String user;
    private String host;
    private String password;
    private String command;
    private List<String> commandResults;
    private TaskExecutor taskExecutor;

    public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) {

        setUser(user);
        setHost(host);
        setPassword(password);
        setTaskExecutor(taskExecutor);
    }

    /**
     * @param user the user to set
     */
    public void setUser(String user) {
        this.user = user;
    }

    /**
     * @return the user
     */
    public String getUser() {
        return user;
    }

    /**
     * @param host the host to set
     */
    public void setHost(String host) {
        this.host = host;
    }

    /**
     * @return the host
     */
    public String getHost() {
        return host;
    }

    /**
     * @param password the password to set
     */
    public void setPassword(String password) {
        this.password = password;
    }

    /**
     * @return the password
     */
    public String getPassword() {
        return password;
    }

    /**
     * @param command the command to set
     */
    private void setCommand(String command) {
        this.command = command;
    }

    /**
     * @return the command
     */
    private String getCommand() {
        return command;
    }

    /**
     * @param commandResults the commandResults to set
     */
    private void setCommandResults(List<String> commandResults) {
        this.commandResults = commandResults;
    }

    /**
     * @return the commandResults
     */
    public List<String> getCommandResults(String command) {
        taskExecutor.execute(new CommandTask(command) );

        return commandResults;
    }

    /**
     * @param taskExecutor the taskExecutor to set
     */
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /**
     * @return the taskExecutor
     */
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;
    }

    private class CommandTask implements Runnable {

        public CommandTask(String command) {
            setCommand(command);
            System.out.println("test: " + getCommand());
        }

        /**
         * 
         * @param command
         */
        public void run() {

            List<String> results = new LinkedList<String>();
            String command = getCommand();

            try {
                System.out.println("running");
                JSch jsch = new JSch();

                String user = getUser();
                String host = getHost();

                java.util.Properties config = new java.util.Properties(); 
                config.put("StrictHostKeyChecking", "no");

                host = host.substring(host.indexOf('@') + 1);
                Session session = jsch.getSession(user, host, 22);

                session.setPassword(getPassword());
                session.setConfig(config);
                session.connect();

                Channel channel = session.openChannel("exec");
                ((ChannelExec) channel).setCommand(command);

                channel.setInputStream(null);

                ((ChannelExec) channel).setErrStream(System.err);

                InputStream in = channel.getInputStream();

                channel.connect();
                byte[] tmp = new byte[1024];
                while (true) {
                    while (in.available() > 0) {
                        int i = in.read(tmp, 0, 1024);
                        if (i < 0)
                            break;
                        results.add(new String(tmp, 0, i));
                        System.out.print(new String(tmp, 0, i));
                    }
                    if (channel.isClosed()) {
                        //System.out.println("exit-status: "
                        //      + channel.getExitStatus());
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ee) {
                        ee.printStackTrace();
                    }
                }
                channel.disconnect();
                session.disconnect();
            } catch (Exception e) {
                System.out.println(e);
            }
            setCommandResults(results);
            System.out.println("finished running");
        }
    }
}

В рамках моего теста junit у меня есть:

@Test
    public void testRemoteExecution() {

        remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand");
        remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");

            //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");
        //for (String line : results) {
        //  System.out.println(line.trim());
        //}
    }

Мой файл applicationContext.xml:

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
       <property name="corePoolSize" value="5" />
       <property name="maxPoolSize" value="10" />
       <property name="queueCapacity" value="25" />
    </bean>        

<!-- ******************** -->
<!--      Utilities       -->
<!-- ******************** -->

     <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype">
        <description>Remote Command</description>
        <constructor-arg><value>${remote.user}</value></constructor-arg>
        <constructor-arg><value>${remote.host}</value></constructor-arg>
        <constructor-arg><value>${remote.password}</value></constructor-arg>
        <constructor-arg ref="taskExecutor" />
    </bean> 

Я добираюсь до первого println в методе run(). Затем тест выходит из строя без ошибок. Я никогда не попадаю во второй println в нижней части этой процедуры. Я посмотрел на эту тему здесь, что было очень полезно, но не реализовано в стиле Spring. Я уверен, что я пропустил что-то простое или полностью сбежал от рельсов. Любая помощь приветствуется.

4b9b3361

Ответ 1

public List<String> getCommandResults(String command) {
    FutureTask task = new FutureTask(new CommandTask(command))
    taskExecutor.execute(task);

    return task.get(); //or task.get(); return commandResults; - but it not a good practice
}

Ответ 2

Интерфейс TaskExecutor представляет собой интерфейс "огонь-и-забыть" для использования, когда вам не все равно, когда задача завершается. Это самая простая асинхронная абстракция, которую предлагает Spring.

Однако существует расширенный интерфейс AsyncTaskExecutor, который предоставляет дополнительные методы, включая методы submit(), которые возвращают Future, которые позволяют ждать результата.

Spring предоставляет класс ThreadPoolTaskExecutor, который реализует как TaskExecutor, так и AsyncTaskExecutor.

В вашем конкретном случае я бы повторно выполнил Runnable как Callable и вернул commandResults из метода Callable.call(). Затем метод getCommandResults может быть переопределен как:

public List<String> getCommandResults(String command) {
   Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command));
   return futureResults.get();
}

Этот метод будет отправлять задачу асинхронно, а затем дождаться завершения, прежде чем возвращать результаты, возвращенные методом Callable.call(). Это также позволяет избавиться от поля commandResults.