第21章 – 并发 – BlcokingQueue
第21章 – 并发 – BlcokingQueue
1. BlockingQueue简介
BlockingQueue 是一个接口.要求其实现是线程安全的.
BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,
某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。
BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,
超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue
总是报告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。
因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。
然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的
并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、
retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。
因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。
这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,
插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
2. BlockingQueue 接口的继承关系
Iterable <- Collection <- Queue <- BlockingQueue
(1) public interface Iterable<T> 接口只有一个方法
Iterator<T> iterator() 返回一个在一组 T 类型的元素上进行迭代的迭代器。
Iterator用来迭代集合(Collection)中元素,用三个方法:
boolean hasNext() 如果仍有元素可以迭代,则返回 true。
E next() 返回迭代的下一个元素。
void remove() 从迭代器指向的 collection 中移除迭代器返回的最后一个元素(可选操作)。
(2) public interface Collection<E>extends Iterable<E>
方法为:
boolean add(E e) 确保此 collection 包含指定的元素(可选操作)。
boolean addAll(Collection<? extends E> c) 将指定 collection 中的所有元素都添加到此 collection 中(可选操作)。
void clear() 移除此 collection 中的所有元素(可选操作)。
boolean contains(Object o) 如果此 collection 包含指定的元素,则返回 true。
boolean containsAll(Collection<?> c) 如果此 collection 包含指定 collection 中的所有元素,则返回 true。
boolean equals(Object o) 比较此 collection 与指定对象是否相等。
int hashCode() 返回此 collection 的哈希码值。
boolean isEmpty() 如果此 collection 不包含元素,则返回 true。
Iterator<E> iterator() 返回在此 collection 的元素上进行迭代的迭代器。
boolean remove(Object o) 从此 collection 中移除指定元素的单个实例,如果存在的话(可选操作)。
boolean removeAll(Collection<?> c) 移除此 collection 中那些也包含在指定 collection 中的所有元素(可选操作)。
boolean retainAll(Collection<?> c) 仅保留此 collection 中那些也包含在指定 collection 的元素(可选操作)。
int size() 返回此 collection 中的元素数。
Object[] toArray() 返回包含此 collection 中所有元素的数组。
<T> T[] toArray(T[] a) 返回包含此 collection 中所有元素的数组;返回数组的运行时类型与指定数组的运行时类型相同。
(3) public interface Queue<E>extends Collection<E>
方法为:
boolean add(E e) 将指定的元素插入此队列(如果立即可行且不会违反容量限制),在成功时返回 true,
如果当前没有可用的空间,则抛出 IllegalStateException。
E element() 获取,但是不移除此队列的头。
boolean offer(E e) 将指定的元素插入此队列(如果立即可行且不会违反容量限制),当使用有容量限制的队列时,
此方法通常要优于 add(E),后者可能无法插入元素,而只是抛出一个异常。
E peek() 获取但不移除此队列的头;如果此队列为空,则返回 null。
E poll() 获取并移除此队列的头,如果此队列为空,则返回 null。
E remove() 获取并移除此队列的头。如果此队列为空抛异常 NoSuchElementException
该接口中不同的方法可能有相同的行为,比如返回add和offer,但是它们也有细微差别:
在这些操作失败时,处理方式不同
抛出异常 返回特殊值
插入 add(e) offer(e) (插入失败,则返回false)
移除 remove() poll() (队列为空,返回null)
检查 element() peek() (队列为空,返回null)
(4) public interface BlockingQueue<E>extends Queue<E>
方法为:
boolean add(E e) 将指定元素插入此队列中(如果立即可行且不会违反容量限制),
成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。
boolean contains(Object o) 如果此队列包含指定元素,则返回 true。
int drainTo(Collection<? super E> c) 移除此队列中所有可用的元素,
并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements) 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
boolean offer(E e) 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
boolean offer(E e, long timeout, TimeUnit unit) 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。
E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
void put(E e) 将指定元素插入此队列中,将等待可用的空间(如果有必要)。
int remainingCapacity() 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的附加元素数量;
如果没有内部限制,则返回 Integer.MAX_VALUE。
boolean remove(Object o) 从此队列中移除指定元素的单个实例(如果存在)。
E take() 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
上面接口方法可知,这些接口继承关系比较凌乱,有的方法每个接口都有,比如add方法.所以使用这些接口的实现类时,
重点关注最下层的接口中的方法.多使用与该接口关系最近的方法,避免使用该接口的祖先接口中的方法.
比如 BlockingQueue 常用于实现生产者-消费者问题,最常用方法take()和put(),offer(),poll(),等
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,
这四种形式的处理方式不同:
第一种是抛出一个异常,
第二种是返回一个特殊值(null 或 false,具体取决于操作),
第三种是在操作可以成功前,无限期地阻塞当前线程,
第四种是在放弃前只在给定的最大时间限制内阻塞。
下表中总结了这些方法:
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
3. BlockingQueue 接口实现类
(1)public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。
队列的头部 是在队列中存在时间最长的元素。
队列的尾部 是在队列中存在时间最短的元素。
新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。
一旦创建了这样的缓存区,就不能再增加其容量。
试图向已满队列中放入元素会导致操作受阻塞;
试图从空队列中提取元素将导致类似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。
然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。
公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。
(2) public class LinkedBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。
队列的头部 是在队列中时间最长的元素。
队列的尾部 是在队列中时间最短的元素。
新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。
链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。
如果未指定容量,则它等于 Integer.MAX_VALUE。
除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。
(3)public class PriorityBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。
虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。
此类不允许使用 null 元素。
依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。
iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。
此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
在此类上进行的操作不保证具有同等优先级的元素的顺序。
如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。
例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。
要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { final static AtomicLong seq = new AtomicLong(); final long seqNum; final E entry; public FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry; } public int compareTo(FIFOEntry<E> other) { int res = entry.compareTo(other.entry); if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } } |
(4) public class SynchronousQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
同步队列没有任何内部容量,甚至连一个队列的容量都没有。
不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;
除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;
也不能迭代队列,因为其中没有元素可用于迭代。
队列的头 是尝试添加到队列中的首个已排队插入线程的元素;
如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。
对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。
此队列不允许 null 元素。
同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。
它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、
事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。
默认情况下不保证这种排序。
但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。
(5) public class DelayQueue<E extends Delayed>extends AbstractQueue<E>implements BlockingQueue<E>
Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。
该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。
即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
例如,size 方法同时返回到期和未到期元素的计数。
此队列不允许使用 null 元素。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。
(6)public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, Serializable
一个基于已链接节点的、任选范围的阻塞双端队列。
可选的容量范围构造方法参数是一种防止过度膨胀的方式。
如果未指定容量,那么容量将等于 Integer.MAX_VALUE。
只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。
大多数操作都以固定时间运行(不计阻塞消耗的时间)。
异常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove()
以及批量操作,它们均以线性时间运行。
此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。
p.s. 这个类实际上实现的是 public interface BlockingDeque<E>extends BlockingQueue<E>, Deque<E>
这个接口是BlockingQueue的子接口.
4. BlockingQueue 类接口实现类 用法示例:
(1) 示例1,制作吐司.一台机器具有三个任务,一个制作吐司,一个给吐司抹黄油,一个给抹过黄油的吐司上涂果酱.
建立三个阻塞队列,一个队列中只有刚刚制作好的吐司,一个是抹过黄油的,还有一个是涂过果酱的.
制作吐司的线程将制作好的吐司put到吐司队列中,
抹黄油线程从吐司线程中take()吐司,然后抹黄油,完成后将抹过黄油的吐司放到对应的队列中.
涂果酱线程,从抹过黄油的吐司队列中take(),然后图上果酱,完成后,放入到涂过果酱的吐司队列中.
还有消费者线程,从抹过果酱的吐司队列中take()吐司,然后eat.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
package concurrency; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue<Toast> { } class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500)); // Make toast Toast t = new Toast(count++); System.out.println(t); // Insert into queue toastQueue.put(t); } } catch (InterruptedException e) { System.out.println("Toaster interrupted"); } System.out.println("Toaster off"); } } // Apply butter to toast: class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = dryQueue.take(); t.butter(); System.out.println(t); butteredQueue.put(t); } } catch (InterruptedException e) { System.out.println("Butterer interrupted"); } System.out.println("Butterer off"); } } // Apply jam to buttered toast: class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = butteredQueue.take(); t.jam(); System.out.println(t); finishedQueue.put(t); } } catch (InterruptedException e) { System.out.println("Jammer interrupted"); } System.out.println("Jammer off"); } } // Consume the toast: class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = finishedQueue.take(); // Verify that the toast is coming in order, // and that all pieces are getting jammed: if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { System.out.println(">>>> Error: " + t); System.exit(1); } else System.out.println("Chomp! " + t); } } catch (InterruptedException e) { System.out.println("Eater interrupted"); } System.out.println("Eater off"); } } public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); //exec.execute(new Butterer(dryQueue, butteredQueue)); 可以使用2个抹黄油线程 exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } } |
(2) 示例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
package concurrency; public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() { } public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while (countDown-- > 0) { System.out.print(status()); Thread.yield(); } System.out.println(""); } } package concurrency; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; } public void add(LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { System.out.println("Interrupted during put()"); } } public void run() { try { while (!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); // Use this thread } } catch (InterruptedException e) { System.out.println("Waking from take()"); } System.out.println("Exiting LiftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { // Compensate for Windows/Linux difference in the // length of the result produced by the Enter key: new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (java.io.IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { System.out.println(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) { System.out.println(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for (int i = 0; i < 5; i++) runner.add(new LiftOff(5)); getkey("Press 'Enter' (" + msg + ")"); t.interrupt(); System.out.println("Finished " + msg + " test"); } //执行main的线程调用add方法,往BlockingQueue中插入Runnable,相当与生产者 //在插入之前,已经启动一个新线程,并start.start的时候执行消费者的run方法, //在run方法中,调用BlockingQueue的take()方法,如果Queue为空,则消费者线程阻塞. //这个例子演示了 单生产者和单消费者 的情况 public static void main(String[] args) { test("LinkedBlockingQueue", // 不限制容量的阻塞队列 new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", // 限制容量为3个的阻塞队列 new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue", // 队列容量始终为1 new SynchronousQueue<LiftOff>()); } } |
打个赏呗
微信打赏
支付宝打赏
本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2130.html | 边城网事