Подтвердить что ты не робот

Java: Как поток может ждать нескольких объектов?

Нить может использовать Object.wait() для блокировки, пока другой поток не вызовет notify() или notifyAll() на этом объекте.

Но что, если поток хочет подождать до тех пор, пока не будет отмечен один из нескольких объектов? Например, мой поток должен ждать, пока или a) байт станет доступным для чтения из InputStream или b) элемент добавляется к ArrayList.

Как поток может ожидать какого-либо из этих событий?

ИЗМЕНИТЬ

Этот вопрос связан с ожиданием завершения нескольких потоков - в моем случае задействован поток, ожидающий, когда один из многих объектов будет заменен.

4b9b3361

Ответ 1

Нить не может ждать более чем по одному объекту за раз.

Методы wait() и notify() являются объектно-ориентированными. Метод wait() приостанавливает текущий поток выполнения и сообщает объекту отслеживать приостановленный поток. Метод notify() указывает объекту пробудить приостановленные потоки, которые он отслеживает.

Полезная ссылка: Может ли поток вызвать wait() на двух замках сразу в Java (6)?

Ответ 2

Вы находитесь в мире боли. Используйте абстракцию более высокого уровня, такую ​​как очередь блокирующих сообщений, из которой поток может потреблять сообщения, такие как "больше байтов доступно" или "добавленный элемент".

Ответ 3

Они могут использовать один и тот же мьютекс. Вы, потребитель, ожидаете этого мьютекса, оба других уведомляют об этом мьютексе, когда первый может продолжить.

Ответ 4

Вы можете ждать только на одном мониторе. Поэтому уведомители должны уведомить об этом один монитор. Другого пути в этой низкоуровневой синхронизации нет.

Ответ 5

Немного поздно, но это очень интересный вопрос! Казалось бы, вы действительно можете ждать нескольких условий, с одинаковой производительностью и без дополнительных потоков; Это вопрос определения проблемы! Я потратил время, чтобы написать более подробное объяснение в комментариях к коду ниже. По запросу я извлечу абстракцию:

Таким образом, на самом деле ожидание на нескольких объектах совпадает с ожиданием при нескольких условиях. Но следующий шаг - объединить ваши под-условия в состояние -net-условие a-единственное условие. И когда любой компонент условия заставит его стать истинным, вы переверните логическое значение и сообщите об этом блокировке (как и любое другое условие ожидания ожидания).

Мой подход:

Для любого условия это может привести только к двум значениям (true и false). Как это значение создается, не имеет значения. В вашем случае ваше "функциональное условие" - это когда одно из двух значений истинно: (value_a || value_b). Я называю это "функциональное условие" "Nexus-Point". Если вы примените точку зрения, что любое сложное условие - независимо от того, насколько сложно - всегда дает простой результат (истинный или ложный), то то, что вы действительно просите, - "Что приведет к тому, что мое чистое состояние станет истинным?" (Предполагая, что логика "Подождите до истины" ). Таким образом, когда поток приводит к тому, что компонент вашего условия становится истинным (значение value_a или value_b равно true в вашем случае), и вы знаете, что это приведет к тому, что ваше желаемое условие -net будет выполнено, тогда вы можете упростить подход к классическому (в том, что он переворачивает один булевский флаг и освобождает блокировку). С помощью этой концепции вы можете применить подход, ориентированный на объект, чтобы помочь ясности вашей общей логики:

import java.util.HashSet;
import java.util.Set;

/**
 * The concept is that all control flow operation converge
 * to a single value: true or false. In the case of N
 * components in which create the resulting value, the
 * theory is the same. So I believe this is a matter of
 * perspective and permitting 'simple complexity'. for example:
 *
 * given the statement:
 *      while(condition_a || condition_b || ...) { ... }
 *
 * you could think of it as:
 *      let C = the boolean -resulting- value of (condition_a || condition_b || ...),
 *      so C = (condition_a || condition_b || ...);
 *
 * Now if we were to we-write the statement, in lamest-terms:
 *      while(C) { ... }
 *
 * Now if you recognise this form, you'll notice its just the standard
 * syntax for any control-flow statement?
 *
 *      while(condition_is_not_met) {
 *          synchronized (lock_for_condition) {
 *              lock_for_condition.wait();
 *            }
 *      }
 *
 * So in theory, even if the said condition was evolved from some
 * complex form, it should be treated as nothing more then if it
 * was in the simplest form. So whenever a component of the condition,
 * in which cause the net-condition (resulting value of the complex
 * condition) to be met, you would simply flip the boolean and notify
 * a lock to un-park whoever is waiting on it. Just like any standard
 * fashion.
 *
 * So thinking ahead, if you were to think of your given condition as a
 * function whos result is true or false, and takes the parameters of the states
 * in which its comprised of (  f(...) = (state_a || state_b && state_c), for example )
 * then you would recognize "If I enter this state, in which this I know would
 * cause that condition/lock to become true, I should just flip the switch switch,
 * and notify".
 *
 * So in your example, your 'functional condition' is:
 *      while(!state_a && !state_b) {
 *          wait until state a or state b is false ....
 *      }
 *
 * So armed with this mindset, using a simple/assertive form,
 * you would recognize that the overall question:
 * -> What would cause my condition to be true? : if  state_a is true OR state_b is true
 * Ok... So, that means: When state_a or state_b turn true, my overall condition is met!
 * So... I can just simplify this thing:
 *
 *      boolean net_condition = ...
 *      final Object lock = new Lock();
 *
 *      void await() {
 *          synchronized(lock) {
 *              while(!net_condition) {
 *                  lock.wait();
 *              }
 *           }
 *       }
 *
 * Almighty, so whenever I turn state_a true, I should just flip and notify
 * the net_condition!
 *
 *
 *
 * Now for a more expanded form of the SAME THING, just more direct and clear:
 *
 * @author Jamie Meisch
 */
public class Main {


    /**
     *
     * The equivalent if one was to "Wait for one of many condition/lock to
     * be notify me when met" :
     *
     *      synchronized(lock_a,lock_b,lock_c) {
     *          while(!condition_a || !condition_b || !condition_c) {
     *              condition_a.wait();
     *              condition_b.wait();
     *              condition_c.wait();
     *          }
     *      }
     *
     */
    public static void main(String... args) {

        OrNexusLock lock = new OrNexusLock();
        // The workers register themselves as their own variable as part of the overall condition,
        // in which is defined by the OrNuxusLock custom-implement. Which will be true if any of
        // the given variables are true
        SpinningWarrior warrior_a = new SpinningWarrior(lock,1000,5);
        SpinningWarrior warrior_b = new SpinningWarrior(lock,1000,20);
        SpinningWarrior warrior_c = new SpinningWarrior(lock,1000,50);

        new Thread(warrior_a).start();
        new Thread(warrior_b).start();
        new Thread(warrior_c).start();

        // So... if any one of these guys reaches 1000, stop waiting:
        // ^ As defined by our implement within the OrNexusLock


        try {
            System.out.println("Waiting for one of these guys to be done, or two, or all! does not matter, whoever comes first");
            lock.await();
            System.out.println("WIN: " + warrior_a.value() + ":" + warrior_b.value() + ":" + warrior_c.value());
        } catch (InterruptedException ignored) {
        }

    }


    // For those not using Java 8 :)
    public interface Condition {
        boolean value();
    }

    /**
     * A variable in which the net locks 'condition function'
     * uses to determine its overall -net- state.
     */
    public static class Variable {

        private final Object lock;
        private final Condition con;

        private Variable(Object lock, Condition con) {
            this.lock = lock;
            this.con  = con;
        }

        public boolean value() {
            return con.value();
        }

        //When the value of the condition changes, this should be called
        public void valueChanged() {
            synchronized (lock) {
                lock.notifyAll();
            }
        }

    }



    /**
     *
     * The lock has a custom function in which it derives its resulting
     * -overall- state (met, or not met). The form of the function does
     * not matter, but it only has boolean variables to work from. The
     * conditions are in their abstract form (a boolean value, how ever
     * that sub-condition is met). It important to retain the theory
     * that complex conditions yeild a simple result. So expressing a
     * complex statement such as ( field * 5 > 20 ) results in a simple
     * true or false value condition/variable is what this approach is
     * about. Also by centerializing the overal logic, its much more
     * clear then the raw -simplest- form (listed above), and just
     * as fast!
     */
    public static abstract class NexusLock {
        private final Object lock;

        public NexusLock() {
            lock = new Object();
        }

        //Any complex condition you can fathom!
        //Plus I prefer it be consolidated into a nexus point,
        // and not asserted by assertive wake-ups
        protected abstract boolean stateFunction();

        protected Variable newVariable(Condition condition) {
            return new Variable(lock, condition);
        }

        //Wait for the overall condition to be met
        public void await() throws InterruptedException {
            synchronized (lock) {
                while (!stateFunction()) {
                    lock.wait();
                }
            }
        }

    }

    // A implement in which any variable must be true
    public static class OrNexusLock extends NexusLock {


        private final Set<Variable> vars = new HashSet<>();

        public OrNexusLock() {
        }


        public Variable newVar(Condition con) {
            Variable var = newVariable(con);
            vars.add(var); //register it as a general component of or net condition       // We should notify the thread since our functional-condition has changed/evolved:
            synchronized (lock) { lock.notifyAll(); }
            return var;
        }

        @Override
        public boolean stateFunction() { //Our condition for this lock
            // if any variable is true: if(var_a || var_b || var_c || ...)

            for(Variable var : vars) {
                if(var.value() == true) return true;
            }
            return false;
        }

    }

    //increments a value with delay, the condition is met when the provided count is reached
    private static class SpinningWarrior implements Runnable, Condition {

        private final int count;
        private final long delay;
        private final Variable var;

        private int tick = 0;

        public SpinningWarrior(OrNexusLock lock, int count, long delay) {
            this.var   = lock.newVar(this);
            this.count = count; //What to count to?
            this.delay = delay;
        }

        @Override
        public void run() {
            while (state_value==false) { //We're still counting up!
                tick++;
                chkState();
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ignored) {
                    break;
                }
            }
        }

        /**
         * Though redundant value-change-notification are OK,
         * its best to prevent them. As such its made clear to
         * that we will ever change state once.
         */
        private boolean state_value = false;
        private void chkState() {
            if(state_value ==true) return;
            if(tick >= count) {
                state_value = true;
                var.valueChanged(); //Our value has changed
            }
        }

        @Override
        public boolean value() {
            return state_value; //We could compute our condition in here, but for example sake.
        }

    }


}

Ответ 6

Блокировка в обоих случаях над одним и тем же объектом. Вызовите в случае a) или в случае b) уведомите() на том же объекте.

Ответ 7

Похоже, что в вашем случае вы ожидаете "уведомления" из двух разных источников. Возможно, вам не придется "ждать" (как в обычном java synchronized(object) object.wait()) на этих двух объектах как таковых, но попросите их обоих поговорить с очередью или что нет (как упоминают другие ответы, некоторые блокирующие коллекции, такие как LinkedBlockingQueue).

Если вы действительно хотите "подождать" на двух разных Java-объектах, вы можете сделать это, применив некоторые из принципов этого ответа: fooobar.com/questions/25029/... (в основном новый поток, каждый из которых выполняет ожидание на каждом из ожидаемых объектов, уведомляет об этом основной поток, когда сам объект уведомляется), но может быть нелегко управлять синхронизированными аспектами.

Ответ 8

Чтобы обработать завершение любого потока из заданного набора , не дожидаясь завершения всех, в качестве монитора можно использовать выделенный общий объект (lastExited ниже) (wait() и notify() в блоках synchronized). Дополнительные мониторы необходимы для обеспечения того, чтобы в любое время вышло не более одного потока (notifyExitMutex), и не более одного потока ожидает выхода любого потока (waitAnyExitMonitor); поэтому пары wait()/notify() всегда относятся к разным блокам.

Пример (все окончания процесса обрабатываются в порядке завершения потоков):

import java.util.Random;

public class ThreadMonitor {

    private final Runnable[] lastExited = { null };

    private final Object notifyExitMutex = new Object();
    public void startThread(final Runnable runnable) {
        (new Thread(new Runnable() { public void run() {
            try { runnable.run(); } catch (Throwable t) { }
            synchronized (notifyExitMutex) {
                synchronized (lastExited) {
                    while (true) {
                        try {
                            if (lastExited[0] != null) lastExited.wait();
                            lastExited[0] = runnable;
                            lastExited.notify();
                            return;
                        }
                        catch (InterruptedException e) { }
                    }
                }
            }
        }})).start();
    }

    private final Object waitAnyExitMutex = new Object();
    public Runnable waitAnyExit() throws InterruptedException {
        synchronized (waitAnyExitMutex) {
            synchronized (lastExited) {
                if (lastExited[0] == null) lastExited.wait();
                Runnable runnable = lastExited[0];
                lastExited[0] = null;
                lastExited.notify();
                return runnable;
            }
        }
    }

    private static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {
        ThreadMonitor threadMonitor = new ThreadMonitor();

        int threadCount = 0;
        while (threadCount != 100) {
            Runnable runnable = new Runnable() { public void run() {
                try { Thread.sleep(1000 + random.nextInt(100)); }
                catch (InterruptedException e) { }
            }};
            threadMonitor.startThread(runnable);
            System.err.println(runnable + " started");
            threadCount++;
        }

        while (threadCount != 0) {
            Runnable runnable = threadMonitor.waitAnyExit();
            System.err.println(runnable + " exited");
            threadCount--;
        }
    }
}