Подтвердить что ты не робот

Многопоточность производителя/потребителя

Фон

Не имея денег на учебу в школе, я работаю в ночных сменах на платной дороге и использую Интернет, чтобы научить себя некоторым навыкам кодирования, надеясь на лучшую работу завтра или онлайн-продажу какого-либо приложения, которое я делаю. Длинные ночи, немногие клиенты.

Я рассматриваю многопоточность как тему, поскольку я сталкиваюсь с большим количеством кода в литературе (например, Android SDK), который использует его, но я все еще нахожу его неясным.

Дух

Мой подход на этом этапе: попытайтесь закодировать самый простой пример многопоточности, о котором я могу думать, немного ударить головой о стену и посмотреть, могу ли я растянуть мой мозг, чтобы разместить новый образ мышления. Я подвергаю себя своим пределам, надеюсь, превзойти их. Не стесняйтесь критиковать дико, вплоть до nitpicking, и указывать лучшие способы делать то, что я пытаюсь сделать.

Цель

  • Get some advice on how to do the above, based on my efforts so far (code provided)

Упражнение

Здесь определяется область действия:

Определение

Создайте два класса, которые работают в тандеме по созданию объектов данных и их потреблению. Один Thread создает объекты и доставляет их в разделяемое пространство для другого, чтобы забрать и потреблять. Позвольте вызвать производящий поток Producer, потребляющий поток Consumer и общее пространство SharedSpace. Акт создания объектов для потребления другими может быть ассимилирован с помощью аналогии с этим сценарием:

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

Чтобы упростить упражнение, я решил не разрешать маме готовить, когда ребенок ест свой торт. Она просто ждет, пока ребенок закончит свой торт и мгновенно сделает еще один, до определенного предела, для хорошего воспитания. Суть упражнения состоит в том, чтобы практиковать сигнализацию Thread по достижению любого concurrency вообще. Напротив, я сосредоточен на идеальной сериализации, без опроса или "я могу пойти еще?". чеки. Полагаю, мне придется процитировать последующее упражнение, в котором мать и ребенок "работают" параллельно.

Подход

  • Попросите мои классы реализовать интерфейс Runnable, чтобы у них была точка ввода кода

  • Используйте мои классы как аргументы конструктора для объектов Thread, которые создаются и запускаются из программы main entry точка

  • Убедитесь, что программа main не заканчивается перед Thread с помощью Thread.join()

  • Задайте ограничение на количество раз, когда Producer создаст данные для Consumer

  • Согласитесь на значение дозорного значения, которое Produce будет использовать для завершения передачи данных.

  • Логарифмическое получение блокировок на общих ресурсах и событиях производства/потребления данных, включая окончательное отключение рабочих потоков

  • Создайте один SharedSpace объект из программы main и передайте его каждому работнику перед запуском

  • Хранить private ссылку на объект SharedSpace внутри каждого рабочего

  • Предоставить защиту и сообщения, чтобы описать условие готовности Consumer к употреблению до того, как будут созданы какие-либо данные.

  • Остановите Producer после заданного количества итераций

  • Остановите Consumer после того, как он прочитает значение часового

код


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

Журнал выполнения


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

Вопрос

  • Правильно ли это? (например, использует ли он правильные языковые инструменты, правильный подход, содержит ли он какой-то глупый код,...)

Но это выглядит "правильно"?

Я спрашиваю о правильности, даже если результат "выглядит хорошо", потому что вы не можете себе представить, сколько раз что-то пошло не так в моем тестировании "один раз", а не "другое" (например, когда потребитель начал сначала, когда Продюсер никогда не уходит после производства часового и т.д.). Я научился не требовать правильности от "успешного запуска". Напротив, я стал очень подозрительным в отношении псевдопараллельного кода! (по определению это не является даже параллельным! 0

Расширенные ответы

Хороший вопрос фокусируется только на one requested piece of advice (выше), но не стесняйтесь упоминать какие-либо другие темы в вашем ответе, если хотите:

  • Как я могу протестировать параллельный код, когда буду вводить следующие мои попытки?

  • Какие инструменты могут помочь мне в разработке и отладке? Рассмотрим, что я использую Eclipse

  • Будет ли изменен подход, если я разрешаю Producer продолжать производить, причем каждое производство занимает некоторое переменное время, в то время как Consumer потребляет все, что становится доступным? Следует ли запереть замок в другом месте? Должна ли сигнализация измениться с этой парадигмы ожидания/уведомления?

  • Этот метод делает вещи устаревшими и должен ли я изучать что-то еще? Из этой таблички я понятия не имею, что происходит "в реальном мире Java"

Следующие шаги

  • Куда мне идти дальше? Я видел понятие "фьючерсы", упомянутое где-то, но я мог бы использовать нумерованный список тем для работы последовательно, с педалогически упорядоченным, со ссылками на связанные учебные ресурсы.

Тино Сино

4b9b3361

Ответ 1

Правильно ли это указано?

Единственная проблема, которую я вижу, это то, что было упомянуто @Tudor и @Bhaskar. Всякий раз, когда вы проверяете условие, когда вы его ожидаете, вы должны использовать цикл while. Однако речь идет скорее о гоночных условиях с несколькими производителями и потребителями. Возможно появление ложных пробуждений, но условия гонки намного более вероятны. Смотрите мою страницу по теме.

Да, у вас только 1 производитель и 1 потребитель, но вы можете попытаться расширить свой код для нескольких потребителей или скопировать код в другой сценарий.

Я научился не требовать правильности от "успешного запуска". Напротив, я стал очень подозрительно относиться к псевдопараллельному коду!

Хороший инстинкт.

Как я могу протестировать параллельный код, когда буду вводить следующие мои попытки?

Это очень сложно. Масштабирование - это один из способов. Добавьте нескольких производителей и потребителей и посмотрите, есть ли проблемы. Работает на нескольких архитектурах с разными числами/типами процессоров. Ваша лучшая защита будет правильной. Тесная синхронизация, хорошее использование классов BlockingQueue, ExecutorService и т.д., Чтобы сделать ваш сайт более простым и чистым.

Нет простого ответа. Тестирование многопоточного кода чрезвычайно сложно.

Какие инструменты могут помочь мне в разработке и отладке?

Что касается общих вещей, я бы посмотрел на инструмент покрытия, например Emma, чтобы вы могли убедиться в том, что ваши модульные тесты покрывают весь ваш код.

В терминах тестирования многопоточного кода узнайте, как читать kill -QUIT потоки-дампы и посмотреть на выполнение потоков внутри Jconsole. Провайдеры Java, такие как YourKit, также могут помочь.

Будет ли этот подход изменяться, если я позволю продюсеру продолжать производство, причем каждое производство занимает некоторое переменное время...

Я так не думаю. Потребитель будет ждать продюсера навсегда. Может быть, я не понимаю этот вопрос?

Этот метод делает вещи устаревшими и должен ли я изучать что-то еще? Из этой таблички я понятия не имею, что происходит "в реальном мире Java"

Теперь узнаем о ExecutorService classes. Они обрабатывают большой процент кода стиля new Thread(), особенно когда вы имеете дело с несколькими асинхронными задачами, выполняемыми с потоками. Здесь tutorial.

Куда мне идти дальше?

Опять же, ExecutorService. Я предполагаю, что вы прочитали этот начальный docs. Как отметил @Bhaskar, Java Concurrency in Practice является хорошей библией.


Вот некоторые общие комментарии о вашем коде:

  • Классы SharedSpace и Threaded кажутся ухищренным способом сделать это. Если вы играете с базовыми классами и т.п., То отлично. Но в общем, я никогда не использую такой шаблон. Производители и потребители обычно работают с BlockingQueue как LinkedBlockingQueue, и в этом случае для обработки данных синхронизации и volatile вы. Кроме того, я склонен вводить общую информацию в конструктор объекта, а не получать его из базового класса.

  • Обычно, если я использую synchronized, он находится в поле private final. Часто я создаю private final Object lockObject = new Object(); для блокировки, если я не работаю с объектом уже.

  • Будьте осторожны с огромными блоками synchronized и помещайте сообщения журнала внутри разделов synchronized. Журналы обычно делают synchronized IO в файловую систему, которая может быть очень дорогой. У вас должны быть маленькие, очень плотные, synchronized блоки, если это возможно.

  • Вы определяете consumedData вне цикла. Я бы определил его в точке назначения, а затем использовал break для запирания из цикла, если он == -1. Обязательно ограничьте область локальных переменных, если это вообще возможно.

  • Ваши сообщения о регистрации будут доминировать над производительностью вашего кода. Это означает, что когда вы их удаляете, ваш код будет работать совершенно по-другому. Это очень важно, когда вы начинаете отлаживать проблемы с ним. Производительность также (скорее всего) изменится, когда вы перейдете к другой архитектуре с разными процессорами/ядрами.

  • Вы, вероятно, знаете это, но когда вы вызываете sharedSpace.notify();, это означает, что другой поток уведомляется, если он в настоящее время находится в sharedSpace.wait();. Если это не что-то другое, оно пропустит уведомление. Просто FYI.

  • Немного странно делать if (nIterations <= N_ITERATIONS), а затем 3 строки ниже else делать это снова. Дублирование notify() было бы лучше упростить ветвление.

  • У вас есть int nIterations = 0;, затем a while, затем внутри a ++. Это рецепт цикла for:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

Здесь гораздо более плотная версия вашего кода. Это всего лишь пример того, как я буду писать. Опять же, помимо отсутствующего while, в вашей версии нет ничего плохого.

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}

Ответ 2

Вы, кажется, неплохо справились здесь. На самом деле это не так много. Думаю, что я хотел бы рекомендовать, вам следует избегать синхронизации на самом буфере. В этом случае это нормально, но при условии, что вместо этого вы переключитесь на буфер структуры данных, в зависимости от класса, который он может быть синхронизирован внутренне (например, Vector, хотя он устарел до сих пор), поэтому приобретение блокировки извне может испортить его.

Изменить: Bhaskar делает хороший аргумент в пользу использования while для переноса вызовов на wait. Это из-за печально известных побочных пробуждений, которые могут произойти, прежде чем выпустить поток из wait преждевременно, поэтому вам нужно убедиться, что он вернулся.

Что вы можете сделать дальше - это реализовать потребителя конечного буфера-производителя: иметь некоторую общую структуру данных, например. связанный список и установить максимальный размер (например, 10 элементов). Затем пусть продюсер продолжает производить и только приостанавливает его, когда в очереди 10 предметов. Потребитель будет заблокирован всякий раз, когда буфер пуст.

Следующие шаги, которые вы можете предпринять, - это научиться автоматизировать процесс, который вы выполнили вручную. Взгляните на BlockingQueue, который обеспечивает буфер с возможностью блокировки (т.е. Потребитель будет автоматически блокировать, если буфер пуст, и производитель будет блокировать, если он заполнен).

Кроме того, в зависимости от ситуации, исполнители (посмотрите ExecutorService) могут стать достойной заменой, поскольку они инкапсулируют очередь задач и одного или нескольких рабочих (потребителей), поэтому все, что вам нужно, это производитель.

Ответ 3

Producer и Consumer могут быть простыми классами, реализующими Runnable (no extends Threaded). Таким образом, они менее хрупкие. Клиенты могут создавать темы Thread и присоединять экземпляры, чтобы не требовалось накладных расходов иерархии классов.

Ваше состояние перед вами wait() должно быть while(), а не if.

изменить: с JCIP-страницы 301:

void stateDependentMethod() throws InterruptedException {
      // condition predicate must be guarded by lock
      synchronized(lock) {
          while (!conditionPredicate())
            lock.wait();
          // object is now in desired state
       }
  }

У вас есть условие, чтобы остановить статически. Обычно производители и потребители должны быть более гибкими - они должны иметь возможность реагировать на внешний сигнал для остановки.

Для начала, для реализации внешнего сигнала останова, у вас есть флаг:

class Producer implements Runnable { 
     private volatile boolean stopRequested ;

     public void run() {
        while(true){
           if(stopRequested )
                // get out of the loop
         }
     }

     public void stop(){
        stopRequested  = true;
        // arrange to  interrupt the Producer thread here.
     }
 }

Когда вы попытаетесь реализовать вышеизложенное, вы, вероятно, увидите, что возникают другие осложнения - например, ваш производитель сначала публикует, а затем wait() ing, но это может привести к проблемам.

Если вас интересует дальнейшее чтение, я предлагаю прочитать книгу - Java Concurrency In Practice. Это будет иметь множество рекомендаций, чем я могу добавить здесь.