Подтвердить что ты не робот

SwingWorker, done() выполняется до завершения вызовов()

Я некоторое время работал с SwingWorker и, по крайней мере, для меня оказался странным поведением. Я четко понимаю, что из-за соображений производительности несколько призывов к publish() методом коллажируются в одном вызове. Это имеет смысл для меня, и я подозреваю, что SwingWorker хранит какую-то очередь для обработки всех вызовов.

Согласно tutorial и API, когда SwingWorker завершает свое выполнение, doInBackground() заканчивается нормально или рабочий поток отменяется извне, тогда вызывается done(). Пока все хорошо.

Но у меня есть пример (как показано в учебниках), где выполняются вызовы метода process() done после done(). Поскольку оба метода выполняются в Thread Dispatch Thread, я ожидал бы, что done() будет выполнен после завершения всех завершений process(). Другими словами:

Ожидаемое:

Writing...
Writing...
Stopped!

Результат:

Writing...
Stopped!
Writing...

Пример кода

import java.awt.BorderLayout;
import java.awt.Dimension;
import java.awt.Graphics;
import java.awt.event.ActionEvent;
import java.util.List;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Demo {

    private SwingWorker<Void, String> worker;
    private JTextArea textArea;
    private Action startAction, stopAction;

    private void createAndShowGui() {

        startAction = new AbstractAction("Start writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.startWriting();
                this.setEnabled(false);
                stopAction.setEnabled(true);
            }
        };

        stopAction = new AbstractAction("Stop writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.stopWriting();
                this.setEnabled(false);
                startAction.setEnabled(true);
            }
        };

        JPanel buttonsPanel = new JPanel();
        buttonsPanel.add(new JButton(startAction));
        buttonsPanel.add(new JButton(stopAction));

        textArea = new JTextArea(30, 50);
        JScrollPane scrollPane = new JScrollPane(textArea);

        JFrame frame = new JFrame("Test");
        frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        frame.add(scrollPane);
        frame.add(buttonsPanel, BorderLayout.SOUTH);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private void startWriting() {
        stopWriting();
        worker = new SwingWorker<Void, String>() {
            @Override
            protected Void doInBackground() throws Exception {
                while(!isCancelled()) {
                    publish("Writing...\n");
                }
                return null;
            }

            @Override
            protected void process(List<String> chunks) {
                String string = chunks.get(chunks.size() - 1);
                textArea.append(string);
            }

            @Override
            protected void done() {
                textArea.append("Stopped!\n");
            }
        };
        worker.execute();
    }

    private void stopWriting() {
        if(worker != null && !worker.isCancelled()) {
            worker.cancel(true);
        }
    }

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                new Demo().createAndShowGui();
            }
        });
    }
}
4b9b3361

Ответ 1

КРАТКОЙ ОТВЕТ:

Это происходит из-за того, что publish() напрямую не платит process, он устанавливает таймер, который будет запускать планирование блока process() в EDT после DELAY. Поэтому, когда работник отменен, все еще есть таймер, ожидающий запланировать процесс() с данными последнего опубликования. Причина использования таймера заключается в реализации оптимизации, когда один процесс может выполняться с объединенными данными нескольких изданий.

ДОЛГОЙ ОТВЕТ:

Посмотрим, как publish() и cancel взаимодействуют друг с другом, для чего давайте погрузиться в какой-то исходный код.

Сначала легкая часть, cancel(true):

public final boolean cancel(boolean mayInterruptIfRunning) {
    return future.cancel(mayInterruptIfRunning);
}

Это отмена завершает вызов следующего кода:

boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED)) // <-----
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt(); // <-----
    }
    releaseShared(0);
    done(); // <-----
    return true;
}

Состояние SwingWorker установлено на CANCELLED, поток прерывается и вызывается done(), но это не выполняется SwingWorker, а future done(), который указан, когда переменная создается в Конструктор SwingWorker:

future = new FutureTask<T>(callable) {
    @Override
    protected void done() {
        doneEDT();  // <-----
        setState(StateValue.DONE);
    }
};

И код doneEDT():

private void doneEDT() {
    Runnable doDone =
        new Runnable() {
            public void run() {
                done(); // <-----
            }
        };
    if (SwingUtilities.isEventDispatchThread()) {
        doDone.run(); // <-----
    } else {
        doSubmit.add(doDone);
    }
}

Что вызывает SwingWorkers done() напрямую, если мы находимся в EDT, который является нашим делом. В этот момент SwingWorker должен остановиться, не более publish() следует вызвать, это достаточно просто продемонстрировать со следующей модификацией:

while(!isCancelled()) {
    textArea.append("Calling publish\n");
    publish("Writing...\n");
}

Однако мы все еще получаем сообщение "Написание..." от process(). Итак, давайте посмотрим, как называется процесс(). Исходным кодом для publish(...) является

protected final void publish(V... chunks) {
    synchronized (this) {
        if (doProcess == null) {
            doProcess = new AccumulativeRunnable<V>() {
                @Override
                public void run(List<V> args) {
                    process(args); // <-----
                }
                @Override
                protected void submit() {
                    doSubmit.add(this); // <-----
                }
            };
        }
    }
    doProcess.add(chunks);  // <-----
}

Мы видим, что run() Runnable doProcess - это тот, кто заканчивает вызов process(args), но этот код просто вызывает doProcess.add(chunks) not doProcess.run() и там тоже doSubmit. Посмотрим doProcess.add(chunks).

public final synchronized void add(T... args) {
    boolean isSubmitted = true;
    if (arguments == null) {
        isSubmitted = false;
        arguments = new ArrayList<T>();
    }
    Collections.addAll(arguments, args); // <-----
    if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed
        submit(); // <-----
    }
}

Итак, что действительно делает publish(), это добавление кусков в некоторый внутренний ArrayList arguments и вызов submit(). Мы только что увидели, что представляют только вызовы doSubmit.add(this), который является тем же самым методом add, поскольку оба doProcess и doSubmit расширяют AccumulativeRunnable<V>, однако на этот раз V находится Runnable вместо String > как в doProcess. Таким образом, кусок - это runnable, который вызывает process(args). Однако вызов submit() - это совершенно другой метод, определенный в классе doSubmit:

private static class DoSubmitAccumulativeRunnable
     extends AccumulativeRunnable<Runnable> implements ActionListener {
    private final static int DELAY = (int) (1000 / 30);
    @Override
    protected void run(List<Runnable> args) {
        for (Runnable runnable : args) {
            runnable.run();
        }
    }
    @Override
    protected void submit() {
        Timer timer = new Timer(DELAY, this); // <-----
        timer.setRepeats(false);
        timer.start();
    }
    public void actionPerformed(ActionEvent event) {
        run(); // <-----
    }
}

Создает таймер, который запускает код actionPerformed один раз после DELAY miliseconds. После запуска события код будет помечен в EDT, который вызовет внутренний run(), который заканчивается вызовом run(flush()) из doProcess и, таким образом, выполняется process(chunk), где chunk - это сброшенные данные arguments ArrayList. Я пропустил некоторые детали, цепочка вызовов "run" выглядит так:

  • doSubmit.run()
  • doSubmit.run(flush())//На самом деле цикл runnables, но будет иметь только один (*)
  • doProcess.run()
  • doProcess.run(flush())
  • процесс (фрагмент)

(*) Логические isSubmited и flush() (которые сбрасывают это логическое значение) делают так, что дополнительные вызовы для публикации не добавляют doProcess runnables для вызова в doSubmit.run(flush()), однако их данные не игнорируется. Таким образом, выполнение одного процесса для любого количества изданий, вызванных в течение срока действия таймера.

В целом, то, что publish("Writing...") выполняет, выполняет планирование вызова process(chunk) в EDT после DELAY. Это объясняет, почему даже после того, как мы отменили поток и больше не публикуется, все еще выполняется одно выполнение процесса, поскольку в тот момент, когда мы отменяем рабочего там (с большой вероятностью), таймер, который запланирует process() после done() уже планируется.

Почему этот таймер используется вместо планирования процесса() в EDT с помощью invokeLater(doProcess)? Чтобы реализовать оптимизацию производительности, описанную в docs:

Поскольку метод процесса вызывается асинхронно на Событии Dispatch Thread может вызвать несколько вызовов для метода публикации перед выполнением метода процесса. Для достижения всех целей эти призывы объединяются в один вызов с помощью конкатенированных аргументы.     Например:

 publish("1");
 publish("2", "3");
 publish("4", "5", "6");

might result in:
 process("1", "2", "3", "4", "5", "6")

Теперь мы знаем, что это работает, потому что все публикации, которые происходят в интервале DELAY, добавляют их args в эту внутреннюю переменную, которую мы видели arguments, и process(chunk) будет выполнять все эти данные за один раз.

ЭТО БЫСТРО? Временное решение?

Трудно сказать, если это ошибка или нет. Может быть, имеет смысл обрабатывать данные, которые опубликовал фоновый поток, поскольку работа действительно выполнена, и вам может быть интересно получить обновление графического интерфейса с такой же информацией как вы можете (если это то, что делает process(), например). И тогда, возможно, не имеет смысла, если done() требует, чтобы все обработанные данные и/или вызов process() после того, как done() создал несоответствия данных /GUI.

Существует очевидное обходное решение, если вы не хотите, чтобы какой-либо новый процесс() выполнялся после выполнения(), просто проверьте, отменен ли рабочий в методе process!

@Override
protected void process(List<String> chunks) {
    if (isCancelled()) return;
    String string = chunks.get(chunks.size() - 1);
    textArea.append(string);
}

Сложнее сделать make() выполнимым после этого последнего процесса(), например, сделать может просто использовать также таймер, который запланирует фактическую работу done() после > DELAY. Хотя я не могу думать, что это было бы обычным делом, поскольку, если вы отменили, не должно быть важно пропустить еще один процесс(), когда мы знаем, что мы фактически отменяем выполнение всех будущих.

Ответ 2

Прочитав превосходный ответ DSquare, и пришел к выводу, что потребуется некоторое подклассов, я придумал эту идею для всех, кто должен убедиться, что все опубликованные фрагменты были обработаны в EDT, прежде чем двигаться дальше.

NB Я пытался записать его на Java, а не на Jython (мой язык выбора и официально лучший язык в мире), но это немного сложно, потому что, например, publish есть final, так что вы 'd придется изобретать другой метод, чтобы называть его, а также потому, что вы должны (зевать) параметризовать все с generics в Java.

Этот код должен быть понятен любому человеку Java: просто чтобы помочь с self.publication_counter.get(), это оценивается как False, когда результат равен 0.

# this is how you say Worker... is a subclass of SwingWorker in Python/Jython
class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ):

    # __init__ is the constructor method in Python/Jython
    def __init__( self ):

        # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object
        self.publication_counter = java.util.concurrent.atomic.AtomicInteger()

    def await_processing_of_all_chunks( self ):
        while self.publication_counter.get():
            time.sleep( 0.001 )

    # fully functional override of the Java method     
    def process( self, chunks ):
        for chunk in chunks:
            pass
            # DO SOMETHING WITH EACH CHUNK

        # decrement the counter by the number of chunks received
        # NB do this AFTER dealing with the chunks 
        self.publication_counter.addAndGet( - len( chunks ) )

    # fully functional override of the Java method     
    def publish( self, *chunks ):
        # increment the counter by the number of chunks received
        # NB do this BEFORE publishing the chunks
        self.publication_counter.addAndGet( len( chunks ))
        self.super__publish( chunks )

Итак, в коде вызова вы добавили что-то вроде:

    engine.update_xliff_task.get()
    engine.update_xliff_task.await_processing_of_all_chunks()

PS использование предложения while, подобного этому (т.е. метода опроса), едва ли изящно. Я просмотрел доступные классы java.util.concurrent, такие как CountDownLatch и Phaser (оба с методами блокировки потоков), но я не думаю, что это сходило бы с этой целью...

позже

Я был заинтересован в этом, чтобы настроить правильный класс concurrency (написанный на Java, найденный на сайте Apache) под названием CounterLatch. Их версия останавливает поток в await() , если достигнуто значение счетчика AtomicLong. Моя версия здесь позволяет вам либо сделать это, либо наоборот: сказать "wait до счетчик достигает определенного значения до снятия защелки":

NB используйте AtomicLong для signal и AtomicBoolean для released: потому что в исходной Java они используют ключевое слово volatile. Я думаю, что использование атомных классов достигнет той же цели.

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

Итак, теперь мой код выглядит так:

В конструкторе SwingWorker:

self.publication_counter_latch = CounterLatch() 

В SW.publish:

self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

В потоке, ожидающем завершения обработки фрагмента:

worker.publication_counter_latch.await()