Java 并发编程模式之 Master – Worker模式
Apr112018
1. 角色
1) Master
Master中维护一个job Queue,是任务的描述,这个jobQueue会给每一个Worker共享,因此需要是线程安全的
Master中同时维护一个resultMap用于保存每一个job的处理结果。
2) Worker
Worker是一个runnable, 在其执行run方法之前需要显示的调用setJobQueue和setResultMap方法
将Master中的jobQueue和resultMap设置好,然后从jobQueue中获取一个job,传递给public Object handle(Object job),
并将执行结果保存到ResultMap中去。
这里 显然,每一个job都是由同一个handle方法处理的。
3) job
一个job对象表示描述一个Job所需的参数,比如实例中需要计算1-100这100个整数的立法之和,
则每个job需要做的事情就是计算一个整数的立方,所以描述job的对象只需要是这个整数即可。
4) Client
Client是客户端,调用Master和Worker处理实际问题。
2. 示例代码
//Master import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 任务队列 protected Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>(); // Worker线程队列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); // 子任务处理结果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 是否所有的子任务都结束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } // Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量 public Master(Worker worker, int countWorker) { worker.setJobQueue(jobQueue); worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) threadMap.put("worker-" + i, new Thread(worker, "worker-" + i)); } // 提交一个任务 public void submit(Object job) { jobQueue.add(job); } // 返回子任务结果集 public Map<String, Object> getResultMap() { return resultMap; } // 开始运行所有的Worker进程,进行处理 public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } } }
//Worker import java.util.Map; import java.util.Queue; public class Worker implements Runnable { // 任务队列,用于取得子任务,这个是在master中管理的 protected Queue<Object> workQueue; // 子任务处理结果集,也是在master中生成的 protected Map<String, Object> resultMap; public void setJobQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } // 子任务处理的逻辑,在子类中实现具体逻辑 public Object handle(Object job) { return job; } @Override public void run() { while (true) { // 获取子任务 Object job = workQueue.poll(); if (job == null) break; // 处理子任务 Object re = handle(job); // 将处理结果写入结果集 resultMap.put(Integer.toString(job.hashCode()), re); } } }
//Worker子类 public class PlusWorker extends Worker { @Override public Object handle(Object job) { int result = (Integer) job; result = result * result * result; return result; } }
//Client package test.concurrency.pattern.master_worker; import java.util.Map; import java.util.Set; public class Client { public static void main(String[] args) { Master m = new Master(new PlusWorker(), 2); // 只能一次性的submit所有任务 for (int i = 0; i < 4; i++) { m.submit(i); } // 然后开始执行 m.execute(); int re = 0; Map<String, Object> resultMap = m.getResultMap(); while (resultMap.size() > 0 || !m.isComplete()) { Set<String> keys = resultMap.keySet(); String key = null; for (String k : keys) { key = k; break; } Integer i = null; if (key != null) { i = (Integer) resultMap.get(key); } if (i != null) { re += i; } if (key != null) { resultMap.remove(key); } } System.out.println("testMasterWorker:" + re); } }
赞 赏
微信赞赏 支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/java-thread/2674.html | 边城网事