当前位置: 首页 > Thinking in Java > 正文

第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709)

第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709)

 

1. 单一 生产者与消费者:一个餐馆只有一个厨师(生产者)和一个服务员(消费者)

 

   代码如下,使用单一的锁restaurant.lockObj来协调厨师和服务员.

   当meal为null时,服务员等待,厨师开始工作,反之服务员工作,厨师等待.

 

   一个餐馆和一个厨师时,这种代码运行正常.

package concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


class WaitPerson1 implements Runnable
{
  private Restaurant1 restaurant;

  public WaitPerson1(Restaurant1 r)
  {
    restaurant = r;
  }

  public void run()
  {
    try
    {
      while (!Thread.interrupted())
      {
        //在单一的锁上同步
        synchronized (restaurant.lockObj)
        {
          while (restaurant.meal == null)
          {
            restaurant.lockObj.wait(); // ... for the chef to produce a meal
          }
          System.out.println(Thread.currentThread().getName() + "____Waitperson got " + restaurant.meal);
          restaurant.meal = null;
          restaurant.lockObj.notifyAll(); // Ready for another
        }
      }
    }
    catch (InterruptedException e)
    {
      System.out.println(Thread.currentThread().getName() + "_WaitPerson interrupted");
    }
  }
}

class Chef1 implements Runnable
{
  private Restaurant1 restaurant;

  public Chef1(Restaurant1 r)
  {
    restaurant = r;
  }

  public void run()
  {
    try
    {
      while (!Thread.interrupted())
      {
        synchronized (restaurant.lockObj) 
        {
          while (restaurant.meal != null)
          {
            restaurant.lockObj.wait(); 
          }
          
          System.out.print(Thread.currentThread().getName() + "___Order up! ");
          restaurant.meal = new Meal(restaurant.mealNo++);
          restaurant.lockObj.notifyAll();
        }
      }
    }
    catch (InterruptedException e)
    {
      System.out.println(Thread.currentThread().getName() + "_Chef interrupted");
    }
  }
}

public class Restaurant1
{
  Object lockObj = new Object();
  Meal            meal = null;
  int mealNo = 0;
  ExecutorService exec       = Executors.newCachedThreadPool();
  
  //启动饭馆
  public void run(int chefCount,int waiterCount)
  {
    for(int i=0;i< waiterCount;i++)
    {
      exec.execute(new WaitPerson1(this));
    }
    
    for(int i=0;i < chefCount;i++)
    {
      exec.execute(new Chef1(this));
    }
    
  }
  
  //启动饭馆,一个厨师,一个服务员
  public void run()
  {
    run(1,1);
  }
  
  //关闭饭馆
  public void shutDown()
  {
    exec.shutdownNow();
  }

  public static void main(String[] args)
  {
    Restaurant1 myRestaurant = new Restaurant1();
    myRestaurant.run();
    //1个厨师,一个服务员,
    //当有多个厨师,多个服务员时,会有死锁的问题
    try
    {
      TimeUnit.MILLISECONDS.sleep(100);//指定程序执行时间 单位毫秒
      myRestaurant.shutDown();
    }
    catch (InterruptedException e)
    {
      System.out.println(Thread.currentThread().getName() + "程序休眠抛出异常");
    }
  }
} 

 

2. 当有多个厨师(生产者)和多个服务员(消费者)时,如果在单一锁上同步,则有产生死锁的风险.

 

   将上面例子中 myRestaurant.run(); 

   改成myRestaurant.run(10,10);即可启动10个厨师和10个服务员

   但是 因为在单一锁上同步,则有产生死锁的风险,

   而且,这种在同一个锁上同步的,将所有步骤的代码都放到synchronized中,

   采用多个生产者消费者也没有意义(性能提升意义不大).

   同一时刻,只能有一个生产者(或消费者)在运行.

 

3. 考虑一种场景: 即 餐馆中有若干个厨师和若干个服务员,厨师生产的meal放到mealList中,

   当mealList.size() < maxMealCount时,厨师开始生产meal,否则等待

   当mealList.size() > minMealCount 时,服务员开始消费meal,否则等待

   多个厨师和服务员可以同时工作,以便提升服务速度.

 

   问题分析:

   现实生活中之所以需要多个厨师是因为,厨师生产meal是耗时的,即 new Meal()这个操作是耗时的,

   另外,每个厨师制作meal是独立的,也就是说,使用多个生产者在new Meal()时可以独立运行,不用同步,所以

   多个厨师可以同时并行生产meal,这样可以加快制作meal的速度,就相当于提升了性能.

 

   对于每个服务员来说,拿到meal后,将meal送达制定餐桌是耗时的操作,同时也是独立的.

 

   这里需要注意的是,厨师做好的meal放入mealList时需要同步处理,因为可能有多个厨师同时想要

   将meal放入mealList,并且也可能有多个服务员打算从mealList中获取一个meal.

 

   同步处理时,判断

   当mealList.size() < maxMealCount时,厨师开始生产meal,否则等待

   当mealList.size() > minMealCount 时,服务员开始消费meal,否则等待

 

   另外,在多个厨师和多个服务员时,厨师调用锁的notifyAll()时,可能会唤醒其他厨师的线程,导致

   这个唯一唤醒的线程又进入wait()了.所以使用synchronized(锁对象)的方式不适用于这里的多生产

   者和多消费者的条件.

 

   这里使用Lock和Condition. 一个Lock 的两个condition(cChefs和cWaitPerson)

   在cChef await()的时候,调用cWaitPerson的signalAll(),只通知WaitPerson

   在cWaitPerson await()时,调用cChef的signalAll(),只通知Chef

   避免死锁.

   代码如下:

package concurrency;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Meal
{
  private final int orderNum;

  public Meal(int orderNum) throws InterruptedException
  {
    TimeUnit.MILLISECONDS.sleep(100);//让当前线程休眠100秒,表示这是一个耗时操作.
    
    /**
     * 之前这种写法将InterruptedException 吞掉了,导致chef线程无法中断(因为在chef线程中调用new Meal()了).
     * 这里切记不能轻易的吞掉异常,
     * 正确的做法是使用throws InterruptedException将异常通知给调用方
    try
    {
      TimeUnit.MILLISECONDS.sleep(100);//让当前线程休眠100秒,表示这是一个耗时操作.
    }
    catch (Exception e)
    {
    }
    */
    
    this.orderNum = orderNum;
  }

  public String toString()
  {
    return "Meal " + orderNum;
  }
}

class WaitPerson2 implements Runnable
{
  private Restaurant2 restaurant;

  public WaitPerson2(Restaurant2 r)
  {
    restaurant = r;
  }

  public void run()
  {
    Meal m = null;
    try
    {
      while (!Thread.interrupted())
      {
        restaurant.lock.lock();
        try
        {
          //mealList中的meal小于等于 规定的 list应持有的meal数量的最小值
          //说明当前meal不足,不能消费,所以当前服务员等待
          while(restaurant.mealList.size() <= restaurant.minMealCount)
          {
            restaurant.cWaitPerson.await();
          }
          
          //运行到这里(木有wait),说明当前mealList中的meal数量,够了,则获取最后一个meal
          m = restaurant.mealList.remove(restaurant.mealList.size() - 1);//获取一个meal
          
          //通知厨师线程可以开始将制作好的meal放入到mealList中了
          if(restaurant.mealList.size() <= restaurant.maxMealCount)
          {
            restaurant.cChef.signalAll();
          }
        }
        finally
        {
          restaurant.lock.unlock();
        }
        
        //这里已经释放锁了
        TimeUnit.MILLISECONDS.sleep(100);//表示从mealList中拿到meal后的耗时操作
        System.out.println(Thread.currentThread().getName() + "___ take: " + m);
        m = null;
      }
    }
    catch (InterruptedException e)
    {
      m = null;
      System.out.println(Thread.currentThread().getName() + "_WaitPerson interrupted");
    }
  }
}

class Chef2 implements Runnable
{
  private Restaurant2 restaurant;

  public Chef2(Restaurant2 r)
  {
    restaurant = r;
  }

  public void run()
  {
    Meal m = null;
    try
    {
      while (!Thread.interrupted())
      {
        
        m = new Meal(restaurant.mealNo.addAndGet(1)); //使用原子整数,用addAndGet(1)方法替代++操作
        //这是一个耗时操作,每个线程先把这个操作完成(可以多线程并行完成,以节约时间)
        //这里放到了lock之外运行,
        //假如放到lock里面的try后面第一句运行,则效率下降为原来的1/10了
        
        restaurant.lock.lock();
        try
        {
          //如果当前meal已经超过list中可持有的最大值,则当前厨师等待
          while(restaurant.mealList.size() >= restaurant.maxMealCount)
          {
            restaurant.cChef.await();
          }
          
          //运行到这里,说明没有在restaurant.cChef上等待,
          //即mealList中持有的meal数量没有达到restaurant.maxMealCount
          //此时,将新建的meal加入到list中
          restaurant.mealList.add(m);
          System.out.println(Thread.currentThread().getName() + "___" + m + " ready...");
          
          if(restaurant.mealList.size() > restaurant.minMealCount)
          {
            restaurant.cWaitPerson.signalAll();
          }
        }
        finally
        {
          restaurant.lock.unlock();
        }
      }
    }
    catch (InterruptedException e)
    {
      m = null;
      System.out.println(Thread.currentThread().getName() + "_Chef interrupted");
    }
  }
}

public class Restaurant2
{
  
  Lock lock = new ReentrantLock();
  Condition cChef = lock.newCondition();
  Condition cWaitPerson = lock.newCondition();
  
  int maxMealCount = 10; //meal 池 中已持有的meal的 最 大 数量
  int minMealCount = 1;  //meal 池 中已持有的meal的 最 小 数量
  
  ArrayList<Meal> mealList= new ArrayList<Meal>();
  
  //volatile int  mealNo = 0; 
  //使用volatile 不能保证原子性,因为mealNo++操作不是原子的,这样不同线程可以取到相同的值
  AtomicInteger mealNo = new AtomicInteger(0);
  //使用原子整数,用addAndGet(1)方法替代++操作
  ExecutorService exec       = Executors.newCachedThreadPool();
  
  //启动饭馆
  public void run(int chefCount,int waiterCount)
  {
    for(int i=0;i< waiterCount;i++)
    {
      exec.execute(new WaitPerson2(this));
    }
    
    for(int i=0;i < chefCount;i++)
    {
      exec.execute(new Chef2(this));
    }
    
  }
  
  //关闭饭馆,中断所有的厨师和服务员线程
  public void shutDown()
  {
    exec.shutdownNow();
  }

  public static void main(String[] args)
  {
    
    int iChefCount = 10;
    int iWaitPersonCount = 10;
    
    Restaurant2 myRestaurant = new Restaurant2();
    
    myRestaurant.run(iChefCount,iWaitPersonCount);
    
    try
    {
      TimeUnit.SECONDS.sleep(1); //让程序在1秒钟之后结束
      myRestaurant.shutDown();
      System.out.println(myRestaurant.mealList); //打印看看结束时mealList中内容是啥
    }
    catch (InterruptedException e)
    {
      System.out.println(Thread.currentThread().getName() + "程序休眠抛出异常");
    }
    
  }
} 

 

 

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2140.html | 边城网事

该日志由 边城网事 于2015年03月18日发表在 Thinking in Java 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709) | 边城网事
关键字: ,

第21章 – 并发 – 单一 生产者与消费者,多个生产者与多个消费者(P709) 暂无评论

发表评论

快捷键:Ctrl+Enter