当前位置: 首页 > Java多线程 > 正文

Java 并发编程模式之 Master – Worker模式

目 录
 [ 隐藏 ]

1. 角色 

1) Master 

Master中维护一个job Queue,是任务的描述,这个jobQueue会给每一个Worker共享,因此需要是线程安全的 
Master中同时维护一个resultMap用于保存每一个job的处理结果。 

2) Worker 

Worker是一个runnable, 在其执行run方法之前需要显示的调用setJobQueue和setResultMap方法 
将Master中的jobQueue和resultMap设置好,然后从jobQueue中获取一个job,传递给public Object handle(Object job), 
并将执行结果保存到ResultMap中去。 

这里 显然,每一个job都是由同一个handle方法处理的。 

3) job 

一个job对象表示描述一个Job所需的参数,比如实例中需要计算1-100这100个整数的立法之和, 
则每个job需要做的事情就是计算一个整数的立方,所以描述job的对象只需要是这个整数即可。 

4) Client 

Client是客户端,调用Master和Worker处理实际问题。

2. 示例代码 

//Master 
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
	// 任务队列
	protected Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>();
	// Worker线程队列
	protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
	// 子任务处理结果集
	protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

	// 是否所有的子任务都结束了
	public boolean isComplete() {
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			if (entry.getValue().getState() != Thread.State.TERMINATED) {
				return false;
			}
		}
		return true;
	}

	// Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量
	public Master(Worker worker, int countWorker) {
		worker.setJobQueue(jobQueue);
		worker.setResultMap(resultMap);
		for (int i = 0; i < countWorker; i++)
			threadMap.put("worker-" + i, new Thread(worker, "worker-" + i));
	}

	// 提交一个任务
	public void submit(Object job) {
		jobQueue.add(job);
	}

	// 返回子任务结果集
	public Map<String, Object> getResultMap() {
		return resultMap;
	}

	// 开始运行所有的Worker进程,进行处理
	public void execute() {
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			entry.getValue().start();
		}
	}
}
//Worker 
import java.util.Map;
import java.util.Queue;

public class Worker implements Runnable {
	// 任务队列,用于取得子任务,这个是在master中管理的
	protected Queue<Object> workQueue;
	// 子任务处理结果集,也是在master中生成的
	protected Map<String, Object> resultMap;

	public void setJobQueue(Queue<Object> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(Map<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	// 子任务处理的逻辑,在子类中实现具体逻辑
	public Object handle(Object job) {
		return job;
	}

	@Override
	public void run() {
		while (true) {
			// 获取子任务
			Object job = workQueue.poll();
			if (job == null)
				break;
			// 处理子任务
			Object re = handle(job);
			// 将处理结果写入结果集
			resultMap.put(Integer.toString(job.hashCode()), re);
		}
	}
}
//Worker子类 
public class PlusWorker extends Worker {

	@Override
	public Object handle(Object job) {
		int result = (Integer) job;
		result = result * result * result;
		return result;
	}

}
//Client 
package test.concurrency.pattern.master_worker;

import java.util.Map;
import java.util.Set;

public class Client {
	public static void main(String[] args) {
		Master m = new Master(new PlusWorker(), 2);

		// 只能一次性的submit所有任务
		for (int i = 0; i < 4; i++) {
			m.submit(i);
		}

		// 然后开始执行
		m.execute();

		int re = 0;

		Map<String, Object> resultMap = m.getResultMap();
		while (resultMap.size() > 0 || !m.isComplete()) {
			Set<String> keys = resultMap.keySet();
			String key = null;
			for (String k : keys) {
				key = k;
				break;
			}
			Integer i = null;
			if (key != null) {
				i = (Integer) resultMap.get(key);
			}
			if (i != null) {
				re += i;
			}
			if (key != null) {
				resultMap.remove(key);
			}
		}
		System.out.println("testMasterWorker:" + re);
	}
}

 

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/java-thread/2674.html | 边城网事

该日志由 边城网事 于2018年04月11日发表在 Java多线程 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: Java 并发编程模式之 Master – Worker模式 | 边城网事

Java 并发编程模式之 Master – Worker模式 暂无评论

发表评论

快捷键:Ctrl+Enter