当前位置: 首页 > Thinking in Java > 正文

第21章 – 并发 – 新类库中构件 – Semaphore

第21章 – 并发 – 新类库中构件 – Semaphore

 

1. Semaphore 简介

 

   Semaphore 在构造函数中传入一个int参数,表示计数的信号量,在线程中调用Semaphore.acquire()方法时,

   检查信号量的值,如果信号量大于0则线程继续执行,然后将信号量减1,如果调用Semaphore.acquire()时,

   信号量已经是0了则调用线程会被阻塞.

 

   调用Semaphore.release()方法可以归还一个信号量,信号量归还之后,其他因为Semaphore.acquire()方法

   阻塞的线程就可以继续执行了.

 

   Semaphore常用于对象池的实现.

 

2. 示例代码(P733)

 

  (1) 使用泛型实现的一个资源池

package concurrency;
 
import java.util.concurrent.*;
import java.util.*;
 
public class Pool<T>
{
  private int                size;
  
  private List<T>            items = new ArrayList<T>();
  
  private volatile boolean[] checkedOut;
  //保存池中资源的状态的数组,表示资源是否被签出了
  private Semaphore          available;
 
  public Pool(Class<T> classObject, int size)
  {
    this.size = size;
    checkedOut = new boolean[size];
    available = new Semaphore(size, true);
    // Load pool with objects that can be checked out:
    //初始化时,将池子填满
    for (int i = 0; i < size; ++i)
      try
      {
        // Assumes a default constructor:
        items.add(classObject.newInstance());
      }
      catch (Exception e)
      {
        throw new RuntimeException(e);
      }
  }
 
  public T checkOut() throws InterruptedException
  {
    available.acquire();    //如果当前池子里面没有对象,会导致调用该方法线程阻塞
    return getItem();       //这个方法 需要同步
  }
 
  public void checkIn(T x)
  {
    if (releaseItem(x)) available.release();
  }
 
  private synchronized T getItem()
  {
    for (int i = 0; i < size; ++i)
      if (!checkedOut[i])
      {
        checkedOut[i] = true;
        return items.get(i);
      }
    return null; // Semaphore prevents reaching here
  }
 
  private synchronized boolean releaseItem(T item)
  {
    int index = items.indexOf(item);
    if (index == -1) return false; // Not in the list
    if (checkedOut[index])
    {
      checkedOut[index] = false;
      return true;
    }
    return false; // Wasn't checked out
  }
}

 (2) 创建一个池中资源对象示例,该对象创建的代价比较大,所以使用资源池来提升性能.

package concurrency;
 
public class Fat
{
  private volatile double d;                  // Prevent optimization
  private static int      counter = 0;
  private final int       id      = counter++;
 
  public Fat()
  {
    // Expensive, interruptible operation:
    for (int i = 1; i < 10000; i++)
    {
      d += (Math.PI + Math.E) / (double) i;
    }
  }
 
  public void operation()
  {
    System.out.println(this + " 调用 operation()");
  }
 
  public String toString()
  {
    return "Fat id: " + id;
  }
} // /:~
 
  
  (3) 使用Semaphore的Demo
  
  package concurrency;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable
{
  private static int counter = 0;
  private final int  id      = counter++;
  private Pool<T>    pool;
 
  public CheckoutTask(Pool<T> pool)
  {
    this.pool = pool;
  }
 
  public void run()
  {
    try
    {
      T item = pool.checkOut();
      System.out.println(this + "checked out " + item);
      TimeUnit.SECONDS.sleep(1);//模拟将资源签出,并使用了1秒钟后返回
      System.out.println(this + "checking in " + item);
      pool.checkIn(item);
    }
    catch (InterruptedException e)
    {
      // Acceptable way to terminate
    }
  }
 
  public String toString()
  {
    return "CheckoutTask " + id + " ";
  }
}
 
public class SemaphoreDemo
{
  final static int SIZE = 5;
 
  public static void main(String[] args) throws Exception
  {
    final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
    ExecutorService exec = Executors.newCachedThreadPool();
    
    for (int i = 0; i < SIZE; i++)
      exec.execute(new CheckoutTask<Fat>(pool));
    System.out.println("All CheckoutTasks created");
    //每个线程都先checkout(),1秒后,checkin()
    //这里虽然先创建了线程,但是实际运行时是main()中线程先checkout资源.
    
    List<Fat> list = new ArrayList<Fat>();
    
    for (int i = 0; i < SIZE; i++)
    {
      Fat f = pool.checkOut();
      System.out.println(i + ": main() thread checked out ");
      f.operation();
      list.add(f);
    }
    //main()线程也尝试checkout()所有资源(Fat)
    
    Future<?> blocked = exec.submit(new Runnable()
    {
      public void run()
      {
        try
        {
          // Semaphore prevents additional checkout,
          // so call is blocked:
          pool.checkOut();
        }
        catch (InterruptedException e)
        {
          System.out.println("checkOut() Interrupted");
        }
      }
    });
    
    TimeUnit.SECONDS.sleep(2);
    blocked.cancel(true); // Break out of blocked call
    
    System.out.println("Checking in objects in " + list);
    
    for (Fat f : list)
      pool.checkIn(f);
    for (Fat f : list)
      pool.checkIn(f); // Second checkIn ignored
    exec.shutdown();
  }
} 

 

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2126.html | 边城网事

该日志由 边城网事 于2015年03月18日发表在 Thinking in Java 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 第21章 – 并发 – 新类库中构件 – Semaphore | 边城网事
关键字: ,

第21章 – 并发 – 新类库中构件 – Semaphore 暂无评论

发表评论

快捷键:Ctrl+Enter