Подтвердить что ты не робот

Продюсер/Потребительские потоки с использованием очереди

Я хотел бы создать какое-то приложение для потоковой передачи Producer/Consumer. Но я не уверен, что лучший способ реализовать очередь между ними.

Итак, у меня есть кое-что с двумя идеями (оба из которых могут быть совершенно неправильными). Я хотел бы знать, что было бы лучше, и если они оба сосут то, что было бы лучшим способом реализовать очередь. Это в основном моя реализация очереди в этих примерах, о которых я беспокоюсь. Я расширяю класс Queue, который является домашним классом и является потокобезопасным. Ниже приведены два примера с 4 классами.

Основной класс -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Потребительский класс -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Класс производителя -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Класс очереди -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

ИЛИ

Основной класс -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Потребительский класс -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Класс производителя -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Класс очереди -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

И иди!

4b9b3361

Ответ 1

В Java 5+ есть все инструменты, необходимые для такого рода вещей. Вы захотите:

  • Поместите всех своих продюсеров в один ExecutorService;
  • Поместите всех своих потребителей в другой ExecutorService;
  • Если необходимо, общайтесь между ними, используя BlockingQueue.

Я говорю "если необходимо" для (3), потому что по моему опыту это лишний шаг. Все, что вы делаете, представляет новые задачи службе-потребителю. Итак:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

Итак, producers подчиняется непосредственно consumers.

Ответ 2

ОК, как отмечают другие, лучше всего использовать пакет java.util.concurrent. Я настоятельно рекомендую "Java Concurrency in Practice". Это отличная книга, которая охватывает почти все, что вам нужно знать.

Что касается вашей конкретной реализации, как я заметил в комментариях, не запускайте Threads from Constructors - это может быть опасно.

Оставляя это в стороне, вторая реализация выглядит лучше. Вы не хотите ставить очереди в статические поля. Вы, вероятно, просто теряете гибкость ни за что.

Если вы хотите продолжить свою собственную реализацию (для обучения, я полагаю?), поставьте start() метод как минимум. Вы должны создать объект (вы можете создать экземпляр объекта Thread), а затем вызвать start(), чтобы запустить поток.

Изменить: ExecutorService иметь свою очередь, так что это может ввести в заблуждение.. Вот что-то, чтобы вы начали.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Далее EDIT: Для производителя вместо while(true) вы можете сделать что-то вроде:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

Таким образом вы можете завершить работу исполнителя, вызвав .shutdownNow(). Если вы используете while(true), он не отключится.

Также обратите внимание, что Producer по-прежнему уязвим для RuntimeExceptions (т.е. один RuntimeException остановит обработку)

Ответ 3

Вы изобретаете колесо.

Если вам нужна настойчивость и другие функции предприятия, используйте JMS (я бы предложил ActiveMq).

Если вам нужны быстрые очереди в памяти, используйте одну из импретактов java Queue.

Если вам нужно поддерживать java 1.4 или ранее, используйте Doug Lea отличный concurrent пакет.

Ответ 4

Я расширил предложенный cletus ответ на пример рабочего кода.

  • Один ExecutorService (pes) принимает задачи Producer.
  • Один ExecutorService (ces) принимает задачи Consumer.
  • Оба Producer и Consumer разделяют BlockingQueue.
  • Несколько задач Producer генерируют разные числа.
  • Любая из задач Consumer может потреблять число, сгенерированное с помощью Producer

код:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

выход:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

Примечание. Если вам не нужны несколько продюсеров и потребителей, держите одного продюсера и потребителя. Я добавил нескольких продюсеров и потребителей, чтобы продемонстрировать возможности BlockingQueue среди нескольких производителей и потребителей.

Ответ 5

  • Java-код "BlockingQueue", который имеет синхронизированный метод put и get.
  • Код Java "Продюсер", поток производителя для создания данных.
  • Код Java "Потребитель", потребительский поток, чтобы потреблять полученные данные.
  • Java-код "ProducerConsumer_Main", основная функция для запуска потока производителей и потребителей.

BlockingQueue.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

Consumer.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}

Ответ 6

Это очень простой код.

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}