第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue
第21章 – 并发 – 新类库中构件 – PriorityBlockingQueue
1. PriorityBlockingQueue 简介
public class PriorityBlockingQueue <E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
该类是一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。
虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。
此类不允许使用 null 元素。
依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。
iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。
此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
在此类上进行的操作不保证具有同等优先级的元素的顺序。
如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。
例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。
要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
该类使用与类 PriorityQueue 相同的顺序规则.而PriorityQueue的顺序规则如下:
优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,
具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。
依靠自然顺序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。
Java集合类中的排序分为两种,自然排序和在构造函数时传入Comparator进行排序.
(1) 自然排序是指,放入该队列中的元素必须实现comparable接口.
(2) 使用public interface Comparator<T> 接口.该接口包含方法:
int compare(T o1, T o2) 比较用来排序的两个参数。
boolean equals(Object obj) 指示某个其他对象是否“等于”此 Comparator。
2. 示例代码:
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>
这个类实现了Comparable,可作为放入PriorityBlockingQueue中的元素.
PrioritizedTask在构造函数中传入一个整数作为优先级,优先级最高的排在队列头部,由take()取出.
PriorityBlockingQueue中的元素不能加入重复的元素(Comparable接口方法返回0的表示重复元素)
每次只有一个线程从PriorityBlockingQueue中take()元素(是一个Runnable),然后在当前线程中执行该元素
的run()方法.
package concurrency; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } //优先级越大则排序越靠前,优先级最高的排在队列的头部,这样take()方法获取的头部元素都是优先级最高的 public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch (InterruptedException e) { // Acceptable way to exit } System.out.println(this); } public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ":" + priority + ")"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); // Lowest priority in this program exec = e; } public void run() { int count = 0; for (PrioritizedTask pt : sequence) { System.out.print(pt.summary()); if (++count % 5 == 0) System.out.println(); } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) { queue = q; exec = e; // Used for EndSentinel } public void run() { // Unbounded queue; never blocks. // Fill it up fast with random priorities: for (int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } // Trickle in highest-priority jobs: try { for (int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } // Add jobs, lowest priority first: for (int i = 0; i < 10; i++) queue.add(new PrioritizedTask(i)); // A sentinel to stop all the tasks: queue.add(new PrioritizedTask.EndSentinel(exec)); } catch (InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskProducer"); System.out.println(queue); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) { this.q = q; } public void run() { try { while (!Thread.interrupted()) // Use current thread to run the task: q.take().run(); } catch (InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); } }
赞 赏
微信赞赏
支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2122.html | 边城网事