Используя Spring threading и TaskExecutor, как узнать, когда поток завершен?

Хорошо, возможно наивный вопрос здесь. У меня есть служба, которая должна регистрироваться на нескольких сетевых устройствах, запускать команду для каждого и собирать результаты. Для скорости, а не для сбора информации на каждом устройстве в последовательности, мне нужно получить доступ к ним все одновременно и использовать результаты после их завершения.

Используя структуру Spring и Jsch, я могу легко запросить каждое устройство правильно. Когда я сталкиваюсь с некоторой путаницей, я пытаюсь переделать beans для использования TaskExecutor для этого. Что я не могу понять, как это сделать, так это узнать, когда поток завершен.

Что я до сих пор знаю:

public class RemoteCommand {

    private String user;
    private String host;
    private String password;
    private String command;
    private List<String> commandResults;
    private TaskExecutor taskExecutor;

    public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) {


     public void setUser(String user) {
    public void setUser(String user) {
        this.user = user;

     public String getUser() {
    public String getUser() {
        return user;

     public void setHost(String host) {
    public void setHost(String host) {
        this.host = host;

     public String getHost() {
    public String getHost() {
        return host;

     public void setPassword(String password) {
    public void setPassword(String password) {
        this.password = password;

     public String getPassword() {
    public String getPassword() {
        return password;

     private void setCommand(String command) {
    private void setCommand(String command) {
        this.command = command;

     private String getCommand() {
    private String getCommand() {
        return command;

     private void setCommandResults(List<String> commandResults) {
    private void setCommandResults(List<String> commandResults) {
        this.commandResults = commandResults;

     * @return the commandResults
    public List<String> getCommandResults(String command) {
        taskExecutor.execute(new CommandTask(command) );

        return commandResults;

     public void setTaskExecutor(TaskExecutor taskExecutor) {
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;

     public TaskExecutor getTaskExecutor() {
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;

    private class CommandTask implements Runnable {

        public CommandTask(String command) {
            System.out.println("test: " + getCommand());

         public void run() {
        public void run() {

            List<String> results = new LinkedList<String>();
            String command = getCommand();

            try {
                JSch jsch = new JSch();

                String user = getUser();
                String host = getHost();

                java.util.Properties config = new java.util.Properties(); 
                config.put("StrictHostKeyChecking", "no");

                host = host.substring(host.indexOf('@') + 1);
                Session session = jsch.getSession(user, host, 22);


                Channel channel = session.openChannel("exec");
                ((ChannelExec) channel).setCommand(command);


                ((ChannelExec) channel).setErrStream(System.err);

                InputStream in = channel.getInputStream();

                byte[] tmp = new byte[1024];
                while (true) {
                    while (in.available() > 0) {
                        int i = in.read(tmp, 0, 1024);
                        if (i < 0)
                        results.add(new String(tmp, 0, i));
                        System.out.print(new String(tmp, 0, i));
                    if (channel.isClosed()) {
                        //System.out.println("exit-status: "
                        //      + channel.getExitStatus());
                    try {
                    } catch (Exception ee) {
            } catch (Exception e) {
            System.out.println("finished running");

В рамках моего теста junit у меня есть:

    public void testRemoteExecution() {

        remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand");
        remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");

            //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");
        //for (String line : results) {
        //  System.out.println(line.trim());

Мой файл applicationContext.xml:

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
       <property name="corePoolSize" value="5" />
       <property name="maxPoolSize" value="10" />
       <property name="queueCapacity" value="25" />

<!-- ******************** -->
<!--      Utilities       -->
<!-- ******************** -->

     <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype">
        <description>Remote Command</description>
        <constructor-arg ref="taskExecutor" />

Я добираюсь до первого println в методе run(). Затем тест выходит из строя без ошибок. Я никогда не попадаю во второй println в нижней части этой процедуры. Я посмотрел на эту тему здесь, что было очень полезно, но не реализовано в стиле Spring. Я уверен, что я пропустил что-то простое или полностью сбежал от рельсов. Любая помощь приветствуется.


Ответ 1

public List<String> getCommandResults(String command) {
    FutureTask task = new FutureTask(new CommandTask(command))

    return task.get(); //or task.get(); return commandResults; - but it not a good practice

Ответ 2

Интерфейс TaskExecutor представляет собой интерфейс "огонь-и-забыть" для использования, когда вам не все равно, когда задача завершается. Это самая простая асинхронная абстракция, которую предлагает Spring.

Однако существует расширенный интерфейс AsyncTaskExecutor, который предоставляет дополнительные методы, включая методы submit(), которые возвращают Future, которые позволяют ждать результата.

Spring предоставляет класс ThreadPoolTaskExecutor, который реализует как TaskExecutor, так и AsyncTaskExecutor.

В вашем конкретном случае я бы повторно выполнил Runnable как Callable и вернул commandResults из метода Callable.call(). Затем метод getCommandResults может быть переопределен как:

public List<String> getCommandResults(String command) {
   Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command));
   return futureResults.get();

Этот метод будет отправлять задачу асинхронно, а затем дождаться завершения, прежде чем возвращать результаты, возвращенные методом Callable.call(). Это также позволяет избавиться от поля commandResults.