当前位置: 首页 > Thinking in Java > 正文

第21章 – 并发 – 新类库中构件 – Exchanger

第21章 – 并发 – 新类库中构件 – Exchanger

 

1. Exchanger 简介

 

   Exchanger 是一个很有意思的类.该类是一个泛型类,在定义该类型的变量时需要指定一种类型.

   public class Exchanger<V>extends Object 这个是该类的定义.

 

   构造该类型的对象可用 Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();

 

   这样的方式 定义的Exchanger对象 就和一个List<Integer> (一个存放整形的List)关联起来了.

 

   Exchanger<V> 有个泛型方法:

 

    V exchange(V x) 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

 

  方法里面的类型V 就是构造 Exchanger<V>类型对象时指定的List<Integer>.

 

  构造好的Exchanger<List<Integer>> exchanger 对象可以在两个线程中(线程A,线程B)调用 V exchange(V x) 方法.

 

  假如在线程A中调用这个V exchange(V x)方法时, 线程B还没有到达调用这个方法的位置,则线程A将暂时阻塞,以等待线程B也调用该方法.

  直到线程B也到达调用V exchange(V x)方法时,

  线程A,和线程B中对这两个方法的调用一起返回,并且返回值是对方线程中调用这个V exchange(V x)方法时传递的参数V x.

  而且,两个线程中的传递的参数V x对于彼此间是独立的, 这样就实现了 两个线程安全的交换数据.

 

  注意,Exchanger只能用于两个线程之间安全的交换数据.

 

2. 示例代码:

 

   生产者线程往List<Integer> 中填入小于10000的随机整数,List的size()是随机生成的大于0.根据随机生成的list.size()填满list后

   调用list = exchanger.exchange(list); //这里将填满的list交给consumer,并从试图consumer获取空的list.

   获取空的list后填满,然后再次调用list = exchanger.exchange(list);

   如此循环

 

   消费者线程先调用list = exchanger.exchange(list);//这里consumer先调用,如果producer没准备好,则线程暂时在这里阻塞

   获取到生产者线程中填满的list后打印每一个元素,然后清空list,再次循环调用list = exchanger.exchange(list);

 package concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Producer implements Runnable
{
  List<Integer>            list      = new ArrayList<Integer>();
  Exchanger<List<Integer>> exchanger = null;
  private Random           rand      = new Random(1001);
  private Random           randInt   = new Random(999);
  private int count = 1;

  public Producer(Exchanger<List<Integer>> exchanger)
  {
    this.exchanger = exchanger;
  }

  @Override
  public void run()
  {
    try
    {
      while (!Thread.interrupted())
      {
        int iTmp = 0;
        while(iTmp == 0)
        {
          iTmp = rand.nextInt(10);
        }
        //获得新的list的容量,不能为0
        for (int i = 0; i < iTmp; i++)
        {
          list.add(randInt.nextInt(10000));
        }
        System.out.println("第 "+ (count++) + " 轮,Producer 中的 list = " + list);
        
        list = exchanger.exchange(list); 
        //这里将add过的list交给consumer,并从consumer获取空的list
      }
    }
    catch (InterruptedException e)
    {
      System.out.println("Producer 中断");
    }
  }
}

class Consumer implements Runnable
{
  List<Integer>            list      = new ArrayList<Integer>();
  Exchanger<List<Integer>> exchanger = null;
  private int count = 1;

  public Consumer(Exchanger<List<Integer>> exchanger)
  {
    this.exchanger = exchanger;
  }

  @Override
  public void run()
  {
    try
    {
      while (!Thread.interrupted())
      {
        list = exchanger.exchange(list); 
        //这里consumer先调用,如果producer没准备好,则线程暂时在这里阻塞
        
        int iTmp = list.size();
        System.out.print("第 "+ (count++) + " 轮,Consumer 中的 list = " );
        System.out.print("[");
        
        for (int i = 0; i < iTmp; i++)
        {
          System.out.print(list.get(i)); 
          //每次都出去list的最后一个元素,最终将list清空
          if(i < iTmp - 1) System.out.print(", ");
          
        }
        System.out.println("]");
        System.out.println(" ");
        list.clear();//consumer处理完之后将list清空
      }
    }
    catch (InterruptedException e)
    {
      System.out.println("Consumer 中断");
    }
  }
}

public class TestExchanger
{

  public static void main(String[] args)
  {
    Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
    // 初始化一个Exchanger,然后传递给两个线程后,启动两个线程

    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Consumer(exchanger));
    exec.execute(new Producer(exchanger));

    try
    {
      TimeUnit.MILLISECONDS.sleep(10);
    }
    catch (InterruptedException e)
    {
      System.out.println("main()线程中断异常");

    }
    exec.shutdownNow(); // 运行10毫秒后,终止生产者线程和消费者线程
  }

}

 

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/thinking-in-java/2124.html | 边城网事

该日志由 边城网事 于2015年03月18日发表在 Thinking in Java 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 第21章 – 并发 – 新类库中构件 – Exchanger | 边城网事
关键字: ,

第21章 – 并发 – 新类库中构件 – Exchanger 暂无评论

发表评论

快捷键:Ctrl+Enter