第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709)
第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709)
1. 单一 生产者与消费者:一个餐馆只有一个厨师(生产者)和一个服务员(消费者)
代码如下,使用单一的锁restaurant.lockObj来协调厨师和服务员.
当meal为null时,服务员等待,厨师开始工作,反之服务员工作,厨师等待.
一个餐馆和一个厨师时,这种代码运行正常.
package concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class WaitPerson1 implements Runnable { private Restaurant1 restaurant; public WaitPerson1(Restaurant1 r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { //在单一的锁上同步 synchronized (restaurant.lockObj) { while (restaurant.meal == null) { restaurant.lockObj.wait(); // ... for the chef to produce a meal } System.out.println(Thread.currentThread().getName() + "____Waitperson got " + restaurant.meal); restaurant.meal = null; restaurant.lockObj.notifyAll(); // Ready for another } } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "_WaitPerson interrupted"); } } } class Chef1 implements Runnable { private Restaurant1 restaurant; public Chef1(Restaurant1 r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { synchronized (restaurant.lockObj) { while (restaurant.meal != null) { restaurant.lockObj.wait(); } System.out.print(Thread.currentThread().getName() + "___Order up! "); restaurant.meal = new Meal(restaurant.mealNo++); restaurant.lockObj.notifyAll(); } } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "_Chef interrupted"); } } } public class Restaurant1 { Object lockObj = new Object(); Meal meal = null; int mealNo = 0; ExecutorService exec = Executors.newCachedThreadPool(); //启动饭馆 public void run(int chefCount,int waiterCount) { for(int i=0;i< waiterCount;i++) { exec.execute(new WaitPerson1(this)); } for(int i=0;i < chefCount;i++) { exec.execute(new Chef1(this)); } } //启动饭馆,一个厨师,一个服务员 public void run() { run(1,1); } //关闭饭馆 public void shutDown() { exec.shutdownNow(); } public static void main(String[] args) { Restaurant1 myRestaurant = new Restaurant1(); myRestaurant.run(); //1个厨师,一个服务员, //当有多个厨师,多个服务员时,会有死锁的问题 try { TimeUnit.MILLISECONDS.sleep(100);//指定程序执行时间 单位毫秒 myRestaurant.shutDown(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "程序休眠抛出异常"); } } }
2. 当有多个厨师(生产者)和多个服务员(消费者)时,如果在单一锁上同步,则有产生死锁的风险.
将上面例子中 myRestaurant.run();
改成myRestaurant.run(10,10);即可启动10个厨师和10个服务员
但是 因为在单一锁上同步,则有产生死锁的风险,
而且,这种在同一个锁上同步的,将所有步骤的代码都放到synchronized中,
采用多个生产者消费者也没有意义(性能提升意义不大).
同一时刻,只能有一个生产者(或消费者)在运行.
3. 考虑一种场景: 即 餐馆中有若干个厨师和若干个服务员,厨师生产的meal放到mealList中,
当mealList.size() < maxMealCount时,厨师开始生产meal,否则等待
当mealList.size() > minMealCount 时,服务员开始消费meal,否则等待
多个厨师和服务员可以同时工作,以便提升服务速度.
问题分析:
现实生活中之所以需要多个厨师是因为,厨师生产meal是耗时的,即 new Meal()这个操作是耗时的,
另外,每个厨师制作meal是独立的,也就是说,使用多个生产者在new Meal()时可以独立运行,不用同步,所以
多个厨师可以同时并行生产meal,这样可以加快制作meal的速度,就相当于提升了性能.
对于每个服务员来说,拿到meal后,将meal送达制定餐桌是耗时的操作,同时也是独立的.
这里需要注意的是,厨师做好的meal放入mealList时需要同步处理,因为可能有多个厨师同时想要
将meal放入mealList,并且也可能有多个服务员打算从mealList中获取一个meal.
同步处理时,判断
当mealList.size() < maxMealCount时,厨师开始生产meal,否则等待
当mealList.size() > minMealCount 时,服务员开始消费meal,否则等待
另外,在多个厨师和多个服务员时,厨师调用锁的notifyAll()时,可能会唤醒其他厨师的线程,导致
这个唯一唤醒的线程又进入wait()了.所以使用synchronized(锁对象)的方式不适用于这里的多生产
者和多消费者的条件.
这里使用Lock和Condition. 一个Lock 的两个condition(cChefs和cWaitPerson)
在cChef await()的时候,调用cWaitPerson的signalAll(),只通知WaitPerson
在cWaitPerson await()时,调用cChef的signalAll(),只通知Chef
避免死锁.
代码如下:
package concurrency; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Meal { private final int orderNum; public Meal(int orderNum) throws InterruptedException { TimeUnit.MILLISECONDS.sleep(100);//让当前线程休眠100秒,表示这是一个耗时操作. /** * 之前这种写法将InterruptedException 吞掉了,导致chef线程无法中断(因为在chef线程中调用new Meal()了). * 这里切记不能轻易的吞掉异常, * 正确的做法是使用throws InterruptedException将异常通知给调用方 try { TimeUnit.MILLISECONDS.sleep(100);//让当前线程休眠100秒,表示这是一个耗时操作. } catch (Exception e) { } */ this.orderNum = orderNum; } public String toString() { return "Meal " + orderNum; } } class WaitPerson2 implements Runnable { private Restaurant2 restaurant; public WaitPerson2(Restaurant2 r) { restaurant = r; } public void run() { Meal m = null; try { while (!Thread.interrupted()) { restaurant.lock.lock(); try { //mealList中的meal小于等于 规定的 list应持有的meal数量的最小值 //说明当前meal不足,不能消费,所以当前服务员等待 while(restaurant.mealList.size() <= restaurant.minMealCount) { restaurant.cWaitPerson.await(); } //运行到这里(木有wait),说明当前mealList中的meal数量,够了,则获取最后一个meal m = restaurant.mealList.remove(restaurant.mealList.size() - 1);//获取一个meal //通知厨师线程可以开始将制作好的meal放入到mealList中了 if(restaurant.mealList.size() <= restaurant.maxMealCount) { restaurant.cChef.signalAll(); } } finally { restaurant.lock.unlock(); } //这里已经释放锁了 TimeUnit.MILLISECONDS.sleep(100);//表示从mealList中拿到meal后的耗时操作 System.out.println(Thread.currentThread().getName() + "___ take: " + m); m = null; } } catch (InterruptedException e) { m = null; System.out.println(Thread.currentThread().getName() + "_WaitPerson interrupted"); } } } class Chef2 implements Runnable { private Restaurant2 restaurant; public Chef2(Restaurant2 r) { restaurant = r; } public void run() { Meal m = null; try { while (!Thread.interrupted()) { m = new Meal(restaurant.mealNo.addAndGet(1)); //使用原子整数,用addAndGet(1)方法替代++操作 //这是一个耗时操作,每个线程先把这个操作完成(可以多线程并行完成,以节约时间) //这里放到了lock之外运行, //假如放到lock里面的try后面第一句运行,则效率下降为原来的1/10了 restaurant.lock.lock(); try { //如果当前meal已经超过list中可持有的最大值,则当前厨师等待 while(restaurant.mealList.size() >= restaurant.maxMealCount) { restaurant.cChef.await(); } //运行到这里,说明没有在restaurant.cChef上等待, //即mealList中持有的meal数量没有达到restaurant.maxMealCount //此时,将新建的meal加入到list中 restaurant.mealList.add(m); System.out.println(Thread.currentThread().getName() + "___" + m + " ready..."); if(restaurant.mealList.size() > restaurant.minMealCount) { restaurant.cWaitPerson.signalAll(); } } finally { restaurant.lock.unlock(); } } } catch (InterruptedException e) { m = null; System.out.println(Thread.currentThread().getName() + "_Chef interrupted"); } } } public class Restaurant2 { Lock lock = new ReentrantLock(); Condition cChef = lock.newCondition(); Condition cWaitPerson = lock.newCondition(); int maxMealCount = 10; //meal 池 中已持有的meal的 最 大 数量 int minMealCount = 1; //meal 池 中已持有的meal的 最 小 数量 ArrayList<Meal> mealList= new ArrayList<Meal>(); //volatile int mealNo = 0; //使用volatile 不能保证原子性,因为mealNo++操作不是原子的,这样不同线程可以取到相同的值 AtomicInteger mealNo = new AtomicInteger(0); //使用原子整数,用addAndGet(1)方法替代++操作 ExecutorService exec = Executors.newCachedThreadPool(); //启动饭馆 public void run(int chefCount,int waiterCount) { for(int i=0;i< waiterCount;i++) { exec.execute(new WaitPerson2(this)); } for(int i=0;i < chefCount;i++) { exec.execute(new Chef2(this)); } } //关闭饭馆,中断所有的厨师和服务员线程 public void shutDown() { exec.shutdownNow(); } public static void main(String[] args) { int iChefCount = 10; int iWaitPersonCount = 10; Restaurant2 myRestaurant = new Restaurant2(); myRestaurant.run(iChefCount,iWaitPersonCount); try { TimeUnit.SECONDS.sleep(1); //让程序在1秒钟之后结束 myRestaurant.shutDown(); System.out.println(myRestaurant.mealList); //打印看看结束时mealList中内容是啥 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "程序休眠抛出异常"); } } }
赞 赏
微信赞赏 支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2140.html | 边城网事