Подтвердить что ты не робот

Как создать механизм выполнения последовательности задач

Я пытаюсь закодировать проблему на Java, где мне нужно выполнить кучу задач.

Проблема

Выполнение задания, которое состоит из нескольких задач, и эти задачи имеют зависимости между ними.

У задания будет список задач, и каждая такая задача будет иметь список заданий-преемников (каждая задача-преемник будет иметь свои собственные задачи-преемника - здесь вы можете увидеть рекурсивный характер). Каждая последующая задача может начать свое выполнение, если -

  • Он настроен на выполнение при частичном выполнении его предшественника. В этом случае задача предшественника сообщит, что она завершилась частично, и мои задачи-преемники могут начинаться

  • Успешное завершение задачи предшественника.

Пример

Работа, имеющая 2 начальные задачи A и B. A имеет 2 задания-преемника M и N. B имеет одну задачу-преемника P. P имеет две задачи-преемника Y и Z.

M может начать с частичного завершения своей предшествующей задачи A. Z может начать с частичного завершения своей предшествующей задачи P. N, P и Y могут запускаться только по завершении своих предшественников A, B и P соответственно.

Tasks Hierarchy (A and B can start in parallel)

Мне нужно спроектировать выполнение такого рабочего процесса/задания. В проекте мы должны признать событие частичного завершения, отправленное задачей предшественника, чтобы можно было запустить его задачу-преемника. Как мне это сделать? Есть ли какой-либо шаблон проектирования, который подходит для этой проблемы в concurrency?

4b9b3361

Ответ 1

Взгляните на akka - http://akka.io

с помощью akka вы создаете участников (управляемые событиями, параллельные сущности, обрабатывающие сообщения асинхронно)

каждая задача может быть представлена ​​как актер (вы выбираете, когда его запускать)

вы можете запускать других участников (задач) на частичное полное или полное заполнение (на самом деле вы можете запускать их, когда захотите)

Ответ 2

Ваша проблема выглядит как хороший пример использования Java ForkJoin Framework. Вы могли бы реализовать свои задачи как RecursiveAction или RecursiveTask (в зависимости от того, нужно ли вам возвращать или нет), которые будут запускать свои подзадачи при любом условии, в котором вы нуждаетесь. Вы также сможете контролировать, выполняются ли ваши подчиненные задачи последовательно или параллельно.

Пример:

public class TaskA extends RecursiveAction {
  // ...

  protected void compute() {
    if (conditionForTaskM) {
      TaskM m = new TaskM();
      // Run task M asynchronously or use m.invoke() to run it synchronously.
      invokeAll(m);
    }

    // Run task N at the end of A
    invokeAll(new TaskN());
  }

}

Для выполнения задач вам нужен экземпляр ForkJoinPool:

public static void main(String[] args) {
  ForkJoinPool pool = new ForkJoinPool();
  pool.submit(new TaskA());

  // Properly shutdown your pool...
}

Этот пример довольно упрощен в реализации части вашего примера проблемы. Но, вообще говоря, ForkJoin Framework позволяет создавать древовидные структуры задач, где каждая родительская задача (такая как A, B и P) позволяет вам контролировать выполнение своих непосредственных дочерних задач.

Ответ 3

Давайте начнем с нескольких предположений, чтобы упростить.

  • Фактические задания немного
  • это не распределенная среда
  • задача будет иметь только один процессор
  • Не критическая система
  • Производительность не требуется

:)

Основываясь на вышеприведенных предположениях, я создал https://gist.github.com/ankgupta/98148b8eead2fbbc2bbb

interface Task{
    public void doTask();
}

----EE.java

import java.util.*;

public class EE {
    private static EE ee = new EE();
    Map<String, Task> tasks = new HashMap<>();
    private EE(){ }

    public static EE getEE(){
        return ee;
    }

    public void register(String event, Task t){
        tasks.put(event, t);
    }

    public void message(String event){
        Task t = tasks.get(event);
        if(t!=null)
            t.doTask();
    }

}

class TaskA implements Task{
    public void doTask(){
        System.out.println("TaskA working!");
        EE.getEE().message("TASKA_PARTIAL");
        System.out.println("TaskA still working!");
        EE.getEE().message("TASKA_COMPLETE");
    }
}


class TaskB implements Task{
    public void doTask(){
        System.out.println("TaskB working!");
    }
}


class TaskC implements Task{
    public void doTask(){
        System.out.println("TaskC working!");
    }
}


public class Main{
    public static void main(String[] args){
        EE ee = EE.getEE();
        ee.register("STARTA", new TaskA());
        ee.register("TASKA_PARTIAL", new TaskB());
        ee.register("TASKA_COMPLETE", new TaskC());
        ee.message("STARTA");
    }
}

Улучшение выше

  • у вас должно быть несколько рабочих, чтобы добавить параллельную обработку задач.
  • для обработки ошибок, если задача не выполняется, должно быть отправлено соответствующее сообщение для вызова задачи для ее обработки.
  • вы также должны использовать реальную очередь для событий.
  • В случае использования нескольких процессоров для одной задачи необходимо использовать алгоритм распределения задач.
  • для распределенной среды вам придется обрабатывать имена, CAP...

Ответ 4

2 варианта с точки зрения использования шаблонов, но по сути они довольно похожи. В обоих случаях ситуация с циклическими зависимостями должна обрабатываться как ошибка в конфигурации зависимостей задач. например A → B → A

  • Посредник

После завершения каждой задачи [i] он уведомит посредника, Посредник уведомит все последующие задачи задачи [i]. График зависимостей задач будет читаться, когда механизм выполнения запускается в качестве структуры данных для использования Медиатором.

  1. Публиковать/подписывать шаблон через шину сообщения.

График зависимостей задачи будет считан по мере запуска двигателя, и каждая задача будет подписана на MessageBus своего предшественника task (task [i]) сообщение о завершении по теме [i]

Когда каждая задача [i] завершается, она отправит сообщение завершения в MessageBus в теме [i]. Каждая задача, подписавшаяся на тему [i], получит уведомление и начнет работать.

Ответ 5

Ваш вопрос интересен, потому что кто-то может имитировать простую нейронную сеть, используя этот дизайн. Что касается ответа, я хотел бы рассмотреть проблему как задачу, а не многопоточность / concurrency, потому что concurrency может быть достигнуто просто путем выполнения упорядоченных задач. Теперь попробуйте использовать управляемое событиями программирование, чтобы добиться этого, потому что оно позволяет хорошо сочетать компоненты. Итак, теперь наш дизайн имеет реактивную природу, поэтому мы будем беспокоиться о том, чтобы сигнализировать о зависимых задачах, как только мы закончим, давайте рассмотрим здесь шаблон наблюдателя.

Ваши задачи как наблюдаемые, так и наблюдатели, потому что они ждут уведомления от предшественника и уведомляют преемника, что дает нам следующую конструкцию.

// Task.java
public abstract class Task extends Observable implements Runnable, Observer {
    private final Mutex lock = new Mutex();
    private final String taskId;

    public String getTaskId() {
        return this.taskId;
    }

    private final Set<String> completedTasks;
    private final Set<String> shouldCompletedTasksBeforeStart;

    public Task(final String taskId) {
        this.taskId = taskId;
        this.completedTasks = new HashSet<>();
        this.shouldCompletedTasksBeforeStart = new HashSet<>();
    }

    @Override
    public void run() {
        while (true) {
            this.lock.getLock();
            if (this.completedTasks.equals(this.shouldCompletedTasksBeforeStart)) {
                doWork();
                setChanged();
                notifyObservers(this.taskId);
                // reset
                this.completedTasks.clear();
            }
            this.lock.freeLock();
            try {
                // just some sleep, you change to how it fits you
                Thread.sleep(1000);
            } catch (final InterruptedException e) {
                // TODO Auto-generated catch block
            }
        }
    }

    @Override
    public void update(final Observable observable, final Object arg) {
        this.lock.getLock();
        this.completedTasks.add((String) arg);
        this.lock.freeLock();
    }

    public void addPredecessorTask(final Task task) {
        if (this.taskId.equals(task.taskId)) {
            return;
        }
        this.lock.getLock();
        // Notice here, it is a little logic make your predecessor/successor work
        task.addObserver(this);
        this.shouldCompletedTasksBeforeStart.add(task.taskId);
        this.lock.freeLock();
    }

    protected abstract void doWork();

}

//HelloTask.java
public static class HelloTask extends Task {
    public HelloTask(final String taskId) {
        super(taskId);
    }

    @Override
    protected void doWork() {
        System.out.println("Hello from " + getTaskId() + "!");
    }
}

//Main.java
public class Main {
    public static void main(final String[] args) {
        final HelloTask helloTaskA = new HelloTask("A");
        final HelloTask helloTaskB = new HelloTask("B");
        final HelloTask helloTaskC = new HelloTask("C");

        helloTaskA.addPredecessorTask(helloTaskB);
        helloTaskC.addPredecessorTask(helloTaskB);

        final ExecutorService pool = Executors.newFixedThreadPool(10);
        pool.execute(helloTaskC);
        pool.execute(helloTaskA);
        pool.execute(helloTaskB);

    }
}

Реализация очень простая, и вы улучшаете ее, но она предоставляет вам базовую структуру. Было бы интересно узнать, где вы применяете это?

Ответ 6

Если я хорошо понял вашу потребность, вы можете использовать механизм рабочего процесса, например Activity, для его решения. Я думаю, что это будет проще, чем повторное создание механизма документооборота для вашей конкретной потребности.

Ответ 7

Если вы хотите изобрести колесо и разработать решение самостоятельно, отлично - это ваш выбор. Однако делать это правильно довольно сложно, особенно резьбовая часть. Однако, если вы можете рассмотреть некоторую внешнюю помощь по крайней мере с помощью строительных блоков, это могут быть:

  • guava ListenableFuture - используя эту библиотеку, вы можете создать Callable и передать их в специальный исполнитель пула потоков, который затем позволяет настраивать обратные вызовы на ListenableFuture завершении.
  • вы можете взглянуть на RX Java Observable, который позволяет смешивать и сопоставлять различные задачи. Это использует неинвазивный стиль кодирования, поэтому будьте осторожны!

Ответ 8

Ваша проблема, похоже, является модифицированной версией шаблона Observer. Нижеприведенное решение более общее, чем то, что позволяет зависеть от списка задач.

Создайте задачу класса следующим образом:

class Task{
List<Task> partialCompletionSuccessors;//List of tasks dependent on the partial completeion of this Task
List<Task> fullCompletetionSuccessors;//List of tasks dependent on the full completeion of this Task

List<Task> partialCompletionPredeccessor;//List of tasks that this task depends on their partial completion to start
List<Task> fullCompletetionPredeccessor;//List of tasks that this task depends on their full completion to start


private void notifySuccessorsOfPartialCompletion(){
    for(Task task: partialCompletionSuccessors){
           task.notifyOfPartialCompletion(this);
    }

}

private void notifySuccessorsOfFullCompletion(){
    for(Task task: fullCompletetionSuccessors){
           task.notifyOfPartialCompletion(this);
    }

}

private tryToProcceed(){
 if(partialCompletionPredeccessor.size() == 0 && fullCompletetionPredeccessor.size()==0 ){
     //Start the following task...
      ....
     //When this task partially completes
    notifySuccessorsOfPartialCompletion();

      //When this task fully completes
      notifySuccessorsOfFullCompletion();

  }

}


public void notifyOfPartialCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
      partialCompletionPredeccessor.remove(task);
      tryToProcceed();

}

public void notifyOfFullCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
      fullCompletetionPredeccessor.remove(task);
      tryToProcceed();

}

}

Ответ 9

Этот вопрос очень загружен. Я могу видеть по крайней мере три разные подсистемы в вашем дизайне:

  • задача DAG (частичное завершение для меня означает, что у вас на самом деле есть 2 узла вместо 1), которые могут сохраняться в базе данных, например:
  • рабочая очередь, в которой должна выполняться следующая задача для выполнения (это может быть некоторая очередь сообщений, опять же для сохранения/транзакции);
  • реальная структура выполнения, которая может включать доступ к внешним ресурсам, таким как службы/базы данных, и потенциально может быть распределена.

Ваше описание очень высокое, чтобы вы могли приступить к разработке абстракций, которые вам нужны для этих трех частей, и как они взаимодействуют друг с другом (с точки зрения интерфейсов).

Как только у вас есть все это, вы можете начать предоставлять некоторые простые реализации. Я бы начал с "локального режима", используя простой в DAG памяти, блокирующую очередь и некоторый тип исполнителя java.

В вашем вопросе не содержится подробная информация об SLA, длительности заданий, политике отказа/повтора, транзакциях и т.д., поэтому трудно сказать, как должны быть реализованы ваши модули. Но я предлагаю подумать в терминах абстракций высокого уровня и повторить реализацию. Отличный код никогда не исправит плохой дизайн.

Вы можете остановиться там или начать замену каждой реализации сторонними продуктами, если вам нужно.

Ответ 10

Существует специально разработанная для этой цели структура, называемая Dexecutor, с Dexecutor, вы моделируете свои требования в терминах графика, когда в исполнении, исполнитель должен позаботиться о том, чтобы он выполнял его надежно.

Например:

@Test
public void testDependentTaskExecution() {

    ExecutorService executorService = newExecutor();
    ExecutionEngine<Integer, Integer> executionEngine = new DefaultExecutionEngine<>(executorService);

    try {
        DefaultDependentTasksExecutor<Integer, Integer> executor = new DefaultDependentTasksExecutor<Integer, Integer>(
                executionEngine, new SleepyTaskProvider());

        executor.addDependency(1, 2);
        executor.addDependency(1, 2);
        executor.addDependency(1, 3);
        executor.addDependency(3, 4);
        executor.addDependency(3, 5);
        executor.addDependency(3, 6);
        executor.addDependency(2, 7);
        executor.addDependency(2, 9);
        executor.addDependency(2, 8);
        executor.addDependency(9, 10);
        executor.addDependency(12, 13);
        executor.addDependency(13, 4);
        executor.addDependency(13, 14);
        executor.addIndependent(11);

        executor.execute(ExecutionConfig.NON_TERMINATING);

        Collection<Node<Integer, Integer>> processedNodesOrder = Deencapsulation.getField(executor, "processedNodes");
        assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
        assertThat(processedNodesOrder).size().isEqualTo(14);

    } finally {
        try {
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {

        }
    }
}

private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
    List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
    result.add(new Node<Integer, Integer>(1));
    result.add(new Node<Integer, Integer>(2));
    result.add(new Node<Integer, Integer>(7));
    result.add(new Node<Integer, Integer>(9));
    result.add(new Node<Integer, Integer>(10));
    result.add(new Node<Integer, Integer>(8));
    result.add(new Node<Integer, Integer>(11));
    result.add(new Node<Integer, Integer>(12));
    result.add(new Node<Integer, Integer>(3));
    result.add(new Node<Integer, Integer>(13));
    result.add(new Node<Integer, Integer>(5));
    result.add(new Node<Integer, Integer>(6));
    result.add(new Node<Integer, Integer>(4));
    result.add(new Node<Integer, Integer>(14));
    return result;
}

private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}

private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

    public Task<Integer, Integer> provideTask(final Integer id) {

        return new Task<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            public Integer execute() {
                if (id == 2) {
                    throw new IllegalArgumentException("Invalid task");
                }
                return id;
            }
        };
    }
}

Вот модельный граф

dexecutor-graph.png

Это означает, что задачи № 1, 12 и 11 будут выполняться параллельно, как только один из них завершит выполнение своих зависимых задач, например, как только задача № 1 завершится, его зависимые задачи №2 и №3 начнут запускать

Ответ 11

Тезисы из многопоточности

class TaskA{
    SimpleTask Y;
    SimpleTask Z;

    SimpleTask PJ;
    SimpleTask RJ;

    Run(){
        // do the partial job
        PJ.Run();
        Y.Run();
        // do the remaining job
        RJ.Run();
        Z.Run();
        // return;
    }
} 

class TaskB{
    TaskA P;
    SimpleTask J;

    Run(){
        // do the job
        J.Run();
        P.Run();
        // return;
    }
}