サンプルコード
Master-Worker Pattern
Masterクラス
public class Master {
private Queue<Object> taskQueue = new ConcurrentLinkedQueue<Object>();
private Map<String, Thread> workerMap = new HashMap<String, Thread>();
private Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
public Master(Worker worker, int count) {
worker.setTaskQueue(this.taskQueue);
worker.setResultMap(this.resultMap);
for (int i = 0; i < count; i++) {
String num = Integer.toString(i);
this.workerMap.put(num, new Thread(worker, num));
}
}
public boolean isComplete() {
for (Map.Entry<String, Thread> worker : this.workerMap.entrySet()) {
if(worker.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
public void submit(Object job) {
this.taskQueue.add(job);
}
public Map<String, Object> getResultMap() {
return this.resultMap;
}
public void execute() {
for (Map.Entry<String, Thread> worker : this.workerMap.entrySet()) {
worker.getValue().start();
}
}
}
Workerクラス
public class Worker implements Runnable {
private Queue<Object> taskQueue;
private Map<String, Object> resultMap;
// setter
public Object handle(Object input) {
return input; //具体的なロジックが子クラスに任せる
}
@Override
public void run() {
while(true){
Object input = this.taskQueue.poll();
if(input == null){
break;
}
Object result = this.handle(input);
this.resultMap.put(input.toString(), result);
}
}
}
public class ConcreteWorker extends Worker {
@Override
public Object handle(Object input) {
Integer i = (Integer)input;
return i * i;
}
}
利用側
Master m = new Master(new ConcreteWorker(), 3);
for (int i = 0; i < 10; i++) { //タスクを設定
m.submit(i);
}
m.execute();
int result = 0;
Map<String, Object> resultMap = m.getResultMap();
//すべてのタスクが完了しなくても結果を合計
while (resultMap.size() > 0 || !m.isComplete()) {
Set<String> keys = resultMap.keySet();
String key = null;
for (String k : keys) {
key = k;
break;
}
Integer i = null;
if(key != null){
i = (Integer) resultMap.get(key);
if(i != null){
result += i;
}
resultMap.remove(key);
}
}
System.out.println(result);
Scatter-Gather pattern
RxJavaにより
ExecutorService executors = Executors.newFixedThreadPool(5);
List<Observable<String>> obs = IntStream.range(0, 10)
.boxed()
.map(i -> generateTask(i, executors)).collect(Collectors.toList());
Observable<List<String>> merged = Observable.merge(obs).toList();
List<String> result = merged.toBlocking().first();
...
private Observable<String> generateTask(int i, ExecutorService executorService) {
return Observable
.<String>create(s -> {
Util.delay(2000);
s.onNext( i + "-test");
s.onCompleted();
}).subscribeOn(Schedulers.from(executorService));
}
Spring Reactor Coreにより
ExecutorService executors = Executors.newFixedThreadPool(5);
List<Flux<String>> fluxList = IntStream.range(0, 10)
.boxed()
.map(i -> generateTask(executors, i)).collect(Collectors.toList());
Mono<List<String>> merged = Flux.merge(fluxList).toList();
List<String> list = merged.get();
...
public Flux<String> generateTask(ExecutorService executorService, int i) {
return Flux.<String>create(s -> {
Util.delay(2000);
s.onNext(i + "-test");
s.onComplete();
}).subscribeOn(executorService);
}