当前位置: 首页 > Thinking in Java > 正文

第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue

第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue

 

1. PriorityBlockingQueue 简介

 

   public class PriorityBlockingQueue  <E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable

 

   该类是一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。

   虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。

   此类不允许使用 null 元素。

   依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。 

 

此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。

iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。

如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。

此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。 

 

在此类上进行的操作不保证具有同等优先级的元素的顺序。

如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。

例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。

要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。

 

该类使用与类 PriorityQueue 相同的顺序规则.而PriorityQueue的顺序规则如下:

优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,

具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。

依靠自然顺序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。 

 

Java集合类中的排序分为两种,自然排序和在构造函数时传入Comparator进行排序.

(1) 自然排序是指,放入该队列中的元素必须实现comparable接口.

(2) 使用public interface Comparator<T> 接口.该接口包含方法:

   int compare(T o1, T o2) 比较用来排序的两个参数。 

 boolean equals(Object obj) 指示某个其他对象是否“等于”此 Comparator。 

 

2. 示例代码:

 

   class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>

   这个类实现了Comparable,可作为放入PriorityBlockingQueue中的元素.

   PrioritizedTask在构造函数中传入一个整数作为优先级,优先级最高的排在队列头部,由take()取出.

   PriorityBlockingQueue中的元素不能加入重复的元素(Comparable接口方法返回0的表示重复元素)

 

   每次只有一个线程从PriorityBlockingQueue中take()元素(是一个Runnable),然后在当前线程中执行该元素

   的run()方法.

package concurrency;

		import java.util.ArrayList;
		import java.util.List;
		import java.util.Queue;
		import java.util.Random;
		import java.util.concurrent.ExecutorService;
		import java.util.concurrent.Executors;
		import java.util.concurrent.PriorityBlockingQueue;
		import java.util.concurrent.TimeUnit;
		
		class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>
		{
		  private Random                         rand     = new Random(47);
		  private static int                     counter  = 0;
		  private final int                      id       = counter++;
		  private final int                      priority;
		  protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
		
		  public PrioritizedTask(int priority)
		  {
		    this.priority = priority;
		    sequence.add(this);
		  }
		
		  //优先级越大则排序越靠前,优先级最高的排在队列的头部,这样take()方法获取的头部元素都是优先级最高的
		  public int compareTo(PrioritizedTask arg)
		  {
		    return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
		  }
		
		  public void run()
		  {
		    try
		    {
		      TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
		    }
		    catch (InterruptedException e)
		    {
		      // Acceptable way to exit
		    }
		    System.out.println(this);
		  }
		
		  public String toString()
		  {
		    return String.format("[%1$-3d]", priority) + " Task " + id;
		  }
		
		  public String summary()
		  {
		    return "(" + id + ":" + priority + ")";
		  }
		
		  public static class EndSentinel extends PrioritizedTask
		  {
		    private ExecutorService exec;
		
		    public EndSentinel(ExecutorService e)
		    {
		      super(-1); // Lowest priority in this program
		      exec = e;
		    }
		
		    public void run()
		    {
		      int count = 0;
		      for (PrioritizedTask pt : sequence)
		      {
		        System.out.print(pt.summary());
		        if (++count % 5 == 0) System.out.println();
		      }
		      System.out.println();
		      System.out.println(this + " Calling shutdownNow()");
		      exec.shutdownNow();
		    }
		  }
		}
		
		class PrioritizedTaskProducer implements Runnable
		{
		  private Random          rand = new Random(47);
		  private Queue<Runnable> queue;
		  private ExecutorService exec;
		
		  public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e)
		  {
		    queue = q;
		    exec = e; // Used for EndSentinel
		  }
		
		  public void run()
		  {
		    // Unbounded queue; never blocks.
		    // Fill it up fast with random priorities:
		    for (int i = 0; i < 20; i++)
		    {
		      queue.add(new PrioritizedTask(rand.nextInt(10)));
		      Thread.yield();
		    }
		    // Trickle in highest-priority jobs:
		    try
		    {
		      for (int i = 0; i < 10; i++)
		      {
		        TimeUnit.MILLISECONDS.sleep(250);
		        queue.add(new PrioritizedTask(10));
		      }
		      // Add jobs, lowest priority first:
		      for (int i = 0; i < 10; i++)
		        queue.add(new PrioritizedTask(i));
		      // A sentinel to stop all the tasks:
		      queue.add(new PrioritizedTask.EndSentinel(exec));
		    }
		    catch (InterruptedException e)
		    {
		      // Acceptable way to exit
		    }
		    System.out.println("Finished PrioritizedTaskProducer");
		    System.out.println(queue);
		    
		  }
		}
		
		class PrioritizedTaskConsumer implements Runnable
		{
		  private PriorityBlockingQueue<Runnable> q;
		
		  public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q)
		  {
		    this.q = q;
		  }
		
		  public void run()
		  {
		    try
		    {
		      while (!Thread.interrupted())
		        // Use current thread to run the task:
		        q.take().run();
		    }
		    catch (InterruptedException e)
		    {
		      // Acceptable way to exit
		    }
		    System.out.println("Finished PrioritizedTaskConsumer");
		  }
		}
		
		public class PriorityBlockingQueueDemo
		{
		  public static void main(String[] args) throws Exception
		  {
		    Random rand = new Random(47);
		    ExecutorService exec = Executors.newCachedThreadPool();
		    PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
		    exec.execute(new PrioritizedTaskProducer(queue, exec));
		    exec.execute(new PrioritizedTaskConsumer(queue));
		  }
		} 

 

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2122.html | 边城网事

该日志由 边城网事 于2015年03月18日发表在 Thinking in Java 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue | 边城网事
关键字: ,

第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue 暂无评论

发表评论

快捷键:Ctrl+Enter