第21章 – 并发 – 新类库中构件 – Exchanger
第21章 – 并发 – 新类库中构件 – Exchanger
1. Exchanger 简介
Exchanger 是一个很有意思的类.该类是一个泛型类,在定义该类型的变量时需要指定一种类型.
public class Exchanger<V>extends Object 这个是该类的定义.
构造该类型的对象可用 Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
这样的方式 定义的Exchanger对象 就和一个List<Integer> (一个存放整形的List)关联起来了.
Exchanger<V> 有个泛型方法:
V exchange(V x) 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
方法里面的类型V 就是构造 Exchanger<V>类型对象时指定的List<Integer>.
构造好的Exchanger<List<Integer>> exchanger 对象可以在两个线程中(线程A,线程B)调用 V exchange(V x) 方法.
假如在线程A中调用这个V exchange(V x)方法时, 线程B还没有到达调用这个方法的位置,则线程A将暂时阻塞,以等待线程B也调用该方法.
直到线程B也到达调用V exchange(V x)方法时,
线程A,和线程B中对这两个方法的调用一起返回,并且返回值是对方线程中调用这个V exchange(V x)方法时传递的参数V x.
而且,两个线程中的传递的参数V x对于彼此间是独立的, 这样就实现了 两个线程安全的交换数据.
注意,Exchanger只能用于两个线程之间安全的交换数据.
2. 示例代码:
生产者线程往List<Integer> 中填入小于10000的随机整数,List的size()是随机生成的大于0.根据随机生成的list.size()填满list后
调用list = exchanger.exchange(list); //这里将填满的list交给consumer,并从试图consumer获取空的list.
获取空的list后填满,然后再次调用list = exchanger.exchange(list);
如此循环
消费者线程先调用list = exchanger.exchange(list);//这里consumer先调用,如果producer没准备好,则线程暂时在这里阻塞
获取到生产者线程中填满的list后打印每一个元素,然后清空list,再次循环调用list = exchanger.exchange(list);
package concurrency; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class Producer implements Runnable { List<Integer> list = new ArrayList<Integer>(); Exchanger<List<Integer>> exchanger = null; private Random rand = new Random(1001); private Random randInt = new Random(999); private int count = 1; public Producer(Exchanger<List<Integer>> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { while (!Thread.interrupted()) { int iTmp = 0; while(iTmp == 0) { iTmp = rand.nextInt(10); } //获得新的list的容量,不能为0 for (int i = 0; i < iTmp; i++) { list.add(randInt.nextInt(10000)); } System.out.println("第 "+ (count++) + " 轮,Producer 中的 list = " + list); list = exchanger.exchange(list); //这里将add过的list交给consumer,并从consumer获取空的list } } catch (InterruptedException e) { System.out.println("Producer 中断"); } } } class Consumer implements Runnable { List<Integer> list = new ArrayList<Integer>(); Exchanger<List<Integer>> exchanger = null; private int count = 1; public Consumer(Exchanger<List<Integer>> exchanger) { this.exchanger = exchanger; } @Override public void run() { try { while (!Thread.interrupted()) { list = exchanger.exchange(list); //这里consumer先调用,如果producer没准备好,则线程暂时在这里阻塞 int iTmp = list.size(); System.out.print("第 "+ (count++) + " 轮,Consumer 中的 list = " ); System.out.print("["); for (int i = 0; i < iTmp; i++) { System.out.print(list.get(i)); //每次都出去list的最后一个元素,最终将list清空 if(i < iTmp - 1) System.out.print(", "); } System.out.println("]"); System.out.println(" "); list.clear();//consumer处理完之后将list清空 } } catch (InterruptedException e) { System.out.println("Consumer 中断"); } } } public class TestExchanger { public static void main(String[] args) { Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>(); // 初始化一个Exchanger,然后传递给两个线程后,启动两个线程 ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Consumer(exchanger)); exec.execute(new Producer(exchanger)); try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { System.out.println("main()线程中断异常"); } exec.shutdownNow(); // 运行10毫秒后,终止生产者线程和消费者线程 } }
赞 赏
微信赞赏 支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2124.html | 边城网事