Hadoop Streaming: Цепочки для работы - программирование
Подтвердить что ты не робот

Hadoop Streaming: Цепочки для работы

Вопрос заключается в том, как создавать цепочки в Hadoop, используя потоки Hadoop (только).

4b9b3361

Ответ 1

Этот ответ на самом деле задал вопрос. Обычно я цитирую его, но воздержусь от этого, поскольку он настолько велик.


Это документация о том, как связать два или более потоковых задания, используя Hadoop Потоковая передача (в настоящее время 1.0.3).

Чтобы понять конечный код, который будет выполнять цепочку и быть в состоянии написать любую другую работу в цепочке, требуется предварительная, но практическая теория.

Прежде всего, что такое работа в Hadoop? Задача Hadoop

hadoopJob = Configuration + Execution

где

Конфигурация: все настройки, которые делают возможным выполнение.

Выполнение: набор исполняемых или script файлов, которые выполняют желаемую задачу. Другими словами, карта и сокращение шагов нашей задачи.

Configuration = hadoopEnvironment + userEnvironment

где

hasoopEnvironment: это настройка общей среды Hadoop. Эта общая среда определяется из ресурсов, то есть файлов xml, которые лежат в каталоге $HADOOP_HOME/conf. Например, некоторые ресурсы представляют собой core-site.xml, mapred-site.xml и hadoop-site.xml, которые определяют временную директорию hdfs, отслеживание заданий и количество узлов кластера соответственно.

userEnvrironment: заданные пользователем аргументы при запуске задания. В Hadoop эти аргументы называются параметрами.

userEnvironment = genericOptions + streamingOptions

где

genericOptions: они общие в том смысле, что они обращаются к каждой потоковой работе независимо от работы. Они обрабатываются из GenericsOptionsParser.

streamingOptions: они предназначены для работы в том смысле, что они относятся к определенной работе. Например, каждое задание имеет свои собственные каталоги ввода и вывода или файлы. Они обрабатываются из StreamJob.

Схематически

                            hadoopJob
                               /\
                              /  \
                             /    \
                            /      \
                           /        \
                Configuration       Execution
                     /\                 |
                    /  \                |
                   /    \   executable or script files
                  /      \
                 /        \
                /          \
  hadoopEnvironment     userEnvironment
           |                   /\
           |                  /  \
           |                 /    \ 
    $HADOOP_HOME/conf       /      \
                           /        \   
                genericOptions   streamingOptions
                      |                 |
                      |                 |
            GenericOptionsParser    StreamJob

Как можно видеть, все вышеперечисленное представляет собой ряд конфигураций. Часть этого для администратора кластера (hasoopEnvironment), а другая часть для пользователя (userEnvironment) кластера. В заключение, работа в основном конфигурации на абстрактном уровне, если мы забудем на данный момент выполнение часть.

Наш код должен позаботиться обо всем выше. Теперь мы готовы написать код.

Прежде всего, что такое работа Hadoop на уровне кода? Это файл jar. Всякий раз, когда мы отправьте задание, мы передаем файл jar с некоторыми аргументами командной строки. Например когда мы запускаем одно потоковое задание, мы выполняем команду

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/

где

наша работа - hasoop-streaming-1.0.3.jar

с аргументами командной строки -D mapred.reduce.tasks = 1 -mapper m.py -reducer r.py -input/in.txt -output/out/

Внутри этой банки есть наш класс, который заботится обо всем правильно.

Итак, мы открываем новый java файл, например TestChain.java,

// import everything needed

public class TestChain
{
    //code here
    public static void main( String[] args) throws Exception
    {
        //code here
    }//end main

}//end TestChain

Чтобы обработать hasoopEnvironment, наш класс должен наследовать класс Configured. Class Configured дает нам доступ к среде Hadoop и параметрам, то есть к его ресурсам, упомянутым ранее. Ресурсы представляют собой xml файлы, содержащие данные в форме пары имя/значение.

Двигаясь вперед, каждый интерфейс является более или менее средой между внешним миром и задачей, которую мир хочет достичь. Тем не менее, интерфейс - это инструмент, который мы используем для выполнения нашей задачи. Наш класс, следовательно, является инструментом. Для этого наш класс должен реализовать Tool интерфейс, который объявляет метод run(). Этот метод определяет наше поведение инструмента, когда интерфейс реализован, конечно. Наконец, чтобы использовать наш инструмент, мы используем класс ToolRunner. ToolRunner, через класс GenericOptionsParser, также помогает обрабатывать genericOptions из userEnvironment.

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        //code here
        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

Чтобы завершить изображение, метод run() также называется драйвером, задает задание, включая инициализацию и настройку задания. Обратите внимание, что мы делегировали ToolRunner, связанный с hasoopEnnvironment, первым параметром "new Configuration" метода ToolRunner.run().

Что мы делали до сих пор? Мы просто установили среду, в которой будет работать наш инструмент. Теперь мы должны определить наш инструмент, т.е. Выполнить цепочку.

Поскольку каждое сетевое задание является потоковым заданием, мы создаем каждый из них как таковой. Мы делаем это с использованием метода StreamJob.createJob(String [] args) класса StreamJob, Матрица args строк содержит аргументы командной строки для каждого задания. Эти аргументы командной строки относятся к streamingOptions (заданию) для userEnvironment. Более того, эти аргументы находятся в форме пары параметр/значение. Например, если наша работа имеет файл in.txt в качестве входного/выходного/выходного каталога, m.py как mapper и r.py в качестве редуктора, тогда

String[] example = new String[]
{
    "-mapper"   , "m.py"
    "-reducer"  , "r.py"
    "-input"    , "in.txt"
    "-output"   , "/out/"
}

Вы должны быть осторожны в двух вещах. Во-первых, необходимо "-" . Это та маленькая вещь, которая отличает параметры от ценностей. Здесь mapper - это параметр, а m.py - его значение. Разница понятна из "-" . Во-вторых, если вы добавили пробел между левым и "-" параметра, то этот параметр игнорируется. Если у нас есть "-mapper", то "-mapper" не рассматривается как параметр. Когда StreamJob анализирует матрицу args для пар параметра/значения.Напомним, что задание представляет собой примерно конфигурацию. Мы ожидаем, что StreamJob.creatJob() должен вернуть конфигурацию или что-то подобное этому. Действительно StreamJob.createJob() возвращает a JobConf. Краткое описание объекта JobConf - это описание конкретного задания mapreduce, которое Hadoop понимает и может выполнять, конечно.

Предполагая, что у нас есть три задания для цепи,

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        //code here

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        //code here

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "in3.txt"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        //code here

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

В этот момент мы устанавливаем среду, в которой наш инструмент будет работать, и мы определили его поведение. Однако мы не применили его в действии. ToolRunner не достаточно. ToolRunner, запускает наш инструмент в целом. Он не запускает отдельную целые рабочие места. Мы должны это сделать.

Есть два способа сделать это. Первый способ - использовать JobClient, а второй - использовать JobControl.

Первый способ - JobClient

С JobClient мы запускаем задания цепочки как последовательность, одно задание выполняется за другим вызов JobClient для каждого задания. Метод, выполняющий каждое отдельное задание, JobClient.runJob(jobtorun), где jobtorun - это объект JobConf.

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        JobClient.runJob( job1Conf);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        JobClient.runJob( job2Conf);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "in3.txt"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        JobClient.runJob( job3Conf);

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

Преимущество этого способа, используя JobClient, заключается в том, что на стандартном выходе печатается задание.

Недостатком JobClient является то, что он не может заботиться о зависимостях между заданиями.

Второй способ - JobControl

С JobControl все сетевые задания являются частью группы заданий. Здесь каждое задание выполняется в кадре этой группы. Это означает, что каждое задание цепочки должно быть добавлено сначала в группу, а затем группа - это та, которая выполняется. Группа является FIFO или выполнение каждого задания в группе следует схеме FCFS (First Come First Served). Каждое задание добавляется в группу с помощью метода JobControl.addJob(jobtoadd).

JobControl может обрабатывать зависимости с помощью метода x.addDependingJob(y) где job x зависит от работы y. Это означает, что задание x не может работать до завершения задания y. Если задание x зависит от обоих заданий y и z и z не зависит от y, то с x.addDependingJob(y) и x.addDependingJob(z), мы можем выразить эти зависимостей.

JobControl противоречит JobClient, "работает" с объектами Job. Когда мы звоним например, x.addDependingJob(y) method, x, y - объекты Job. То же самое имеет место для JobControl.addJob(jobtoadd), jobtoadd - объект Job. Каждый объект Job созданный из объекта JobConf. Возвращаясь к коду, мы имеем

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {

        //TestChain below is an arbitrary name for the group
        JobControl jobc = new JobControl( "TestChain");

        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        Job job1 = new Job( job1conf);
        jobc.addJob( job1);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        Job job2 = new Job( job2conf);
        jobc.addJob( job2);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "/out2/par*"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        Job job3 = new Job( job3conf);
        job3.addDependingJob( job2);
        jobc.addJob( job3);

        //code here

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

В приведенном выше коде обратите внимание, что job3 зависит от job2. Как вы можете видеть ввод job3 это выход job2. Этот факт является зависимостью. job3 ожидает завершения задания 2.

До сих пор мы просто добавляли цепочки в группу и описывали их зависимость. Для запуска этой группы заданий нужно еще одно.

Грубая сила говорит просто вызвать метод JobControl.run(). Хотя это подход работает, это проблематично. Когда рабочие задания завершены, весь работа все еще работает вечно. Подход, который работает правильно, - это определение нового Тема выполнения нашей работы. Тема, которая уже существует (когда выполняется задание). Затем мы можем подождать, пока выполняются цепочки, а затем выйдут. Между тем выполнения цепочки заданий, мы можем запросить информацию о выполнении задания, например сколько заданий закончилось или мы можем найти, если работа находится в недействительном состоянии и что и есть.

Преимущество этого способа использования JobControl заключается в том, что он может заботиться о многих которые могут существовать между заданиями.

Недостатком JobControl является то, что прогресс работы не печатается на стандартный выход, он не представлен прямо. Не работает ли работа или успешно, ничего полезного не печатается. Вы должны проверить веб-интерфейс Hadoop или добавить некоторый код в цикл while ниже, чтобы отслеживать состояние задания или что угодно необходимо. Наконец,

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {

        //TestChain below is an arbitrary name for the group
        JobControl jobc = new JobControl( "TestChain");

        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        Job job1 = new Job( job1conf);
        jobc.addJob( job1);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        Job job2 = new Job( job2conf);
        jobc.addJob( job2);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "/out2/par*"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        Job job3 = new Job( job3conf);
        job3.addDependingJob( job2);
        jobc.addJob( job3);

        Thread runjobc = new Thread( jobc);
        runjobc.start();

        while( !jobc.allFinished())
        {
            //do whatever you want; just wait or ask for job information
        }

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

ОШИБКИ

В этом разделе обсуждаются некоторые ошибки, которые могут возникнуть. В приведенных ниже сообщениях об ошибках есть класс OptimizingJoins. Этот класс является классом, чтобы продемонстрировать различные ошибки и не имеет никакого отношения к этому обсуждению.

Пакет не существует при попытке скомпилировать.

Это вопрос класса. Скомпилируйте (например, добавить пакет hasoop-streaming-1.0.3.jar),

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java

и добавьте недостающий пакет.

java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob

Общая ошибка:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more

Это вопрос файла манифеста нашего файла jar. Когда мы скомпилируем нашу работу в путь выше, все в порядке. Компилятор Java находит все, что ему нужно. Но когда мы запускаем нашу работу в Hadoop с помощью команды

$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain

тогда JVM, который запускает нашу банку, не может найти StreamJob. Чтобы решить эту проблему, когда мы создайте файл jar, мы помещаем в банку файл манифеста, который содержит класс- путь StreamJob. Практически,

MANIFEST.MF

Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)

Обратите внимание, что файл MANIFEST.MF всегда заканчивается пустой строкой. Наш MANIFEST.MF файл имеет 4 строки, а не 3. Затем мы создаем файл jar, например,

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class 

ОШИБКА streaming.StreamJob: непризнанная опция: -D

Эта ошибка возникает из-за того, что StreamJob не может проанализировать параметр -D. StreamJob может анализировать только потоковые, специальные задания, -D - общий вариант.

Существует два решения этой проблемы. Первое решение - использовать -jobconf вместо -D. Второе решение - проанализировать опцию -D через Объект GenericOptionsParser. Во втором решении, конечно, вы должны удалить параметр -D из аргументов StreamJob.createJob().

Чтобы привести пример, реализация "чистого" кода второго решения:

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain
{
    public class Job1 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job1 = new String[]
            {
                "-mapper"   , "m1.py"
                "-reducer"  , "r1.py"
                "-input"    , "in1.txt"
                "-output"   , "/out1/"
            }
            JobConf job1Conf = new StreamJob.createJob( job1);
            JobClient.runJob( job1Conf);

            return 0;
        }//end run
    }

    public class Job2 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job2 = new String[]
            {
                "-mapper"   , "m2.py"
                "-reducer"  , "r2.py"
                "-input"    , "in2.txt"
                "-output"   , "/out2/"
            }
            JobConf job2Conf = new StreamJob.createJob( job2);
            JobClient.runJob( job2Conf);

            return 0;
        }//end run
    }

    public class Job3 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job3 = new String[]
            {
                "-mapper"   , "m3.py"
                "-reducer"  , "r3.py"
                "-input"    , "in3.txt"
                "-output"   , "/out3/"
            }
            JobConf job3Conf = new StreamJob.createJob( job3);
            JobClient.runJob( job3Conf);

            return 0;
        }//end run
    }


    public static void main( String[] args) throws Exception
    {
        TestChain tc = new TestChain();

        //Domination
        String[] j1args = new String[]
        {
            "-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
            "-D", "mapred.text.key.comparator.options=-k1,1"    ,
            "-D", "mapred.reduce.tasks=1"
        };

        // Let ToolRunner handle generic command-line options   
        int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);

        //Cost evaluation
        String[] j2rgs = new String[]
        {
            "-D", "mapred.reduce.tasks=12 "                 ,
            "-D", "mapred.text.key,partitioner.options=-k1,1"
        };

        // Let ToolRunner handle generic command-line options   
        int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);

        //Minimum Cost
        String[] j3args = new String[]
        {
            "-D", "mapred.reduce.tasks=1"
        };

        // Let ToolRunner handle generic command-line options   
        int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
        System.exit( mres);
    }
}//end TestChain    

В приведенном выше коде мы определяем глобальный класс TestChain, который инкапсулирует целые рабочие места. Затем мы определяем каждое отдельное цепное задание, т.е. Определяем его метод запуска. Каждое задание цепочки - это класс, который наследует Configured и реализует Tool. В заключение, из основного метода TestChain мы запускаем каждое задание последовательно. Обратите внимание, что перед запуском любое задание цепи мы определяем его общие варианты.

Вкомпилировать

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java 

Баночка

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class 

ERROR security.UserGroupInformation: PriviledgedActionException as: hduser cause: org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://localhost: 54310/user/hduser/whateverFile соответствует 0 файлам

Эта ошибка возникает, когда мы используем JobControl. Например, если задание имеет в качестве входных данных результат предыдущего задания, то, если этот файл ввода-вывода уже не существует, эта ошибка возникает. JobControl запускает все независимые задания в "параллельном", а не один за другим, как JobClient. Таким образом, Jobcontrol пытается запустить задание, чьи входные файлы не существуют и по этой причине терпит неудачу.

Чтобы избежать этой ситуации, мы заявляем, что существует зависимость между этими двумя заданиями с использованием x.addDependingJob(y), задание x зависит от задания y. Теперь JobControl не пытается запускать параллельные зависимые задания.