サンプルコード

Master-Worker Pattern

Masterクラス

public class Master {

private Queue<Object> taskQueue = new ConcurrentLinkedQueue<Object>();

private Map<String, Thread> workerMap = new HashMap<String, Thread>();

private Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

public Master(Worker worker, int count) {

worker.setTaskQueue(this.taskQueue);

worker.setResultMap(this.resultMap);

for (int i = 0; i < count; i++) {

String num = Integer.toString(i);

this.workerMap.put(num, new Thread(worker, num));

}

}

public boolean isComplete() {

for (Map.Entry<String, Thread> worker : this.workerMap.entrySet()) {

if(worker.getValue().getState() != Thread.State.TERMINATED){

return false;

}

}

return true;

}

public void submit(Object job) {

this.taskQueue.add(job);

}

public Map<String, Object> getResultMap() {

return this.resultMap;

}

public void execute() {

for (Map.Entry<String, Thread> worker : this.workerMap.entrySet()) {

worker.getValue().start();

}

}

}

Workerクラス

public class Worker implements Runnable {

private Queue<Object> taskQueue;

private Map<String, Object> resultMap;

// setter

public Object handle(Object input) {

return input; //具体的なロジックが子クラスに任せる

}

@Override

public void run() {

while(true){

Object input = this.taskQueue.poll();

if(input == null){

break;

}

Object result = this.handle(input);

this.resultMap.put(input.toString(), result);

}

}

}

public class ConcreteWorker extends Worker {

@Override

public Object handle(Object input) {

Integer i = (Integer)input;

return i * i;

}

}

利用側

Master m = new Master(new ConcreteWorker(), 3);

for (int i = 0; i < 10; i++) { //タスクを設定

m.submit(i);

}

m.execute();

int result = 0;

Map<String, Object> resultMap = m.getResultMap();

//すべてのタスクが完了しなくても結果を合計

while (resultMap.size() > 0 || !m.isComplete()) {

Set<String> keys = resultMap.keySet();

String key = null;

for (String k : keys) {

key = k;

break;

}

Integer i = null;

if(key != null){

i = (Integer) resultMap.get(key);

if(i != null){

result += i;

}

resultMap.remove(key);

}

}

System.out.println(result);

Scatter-Gather pattern

RxJavaにより

ExecutorService executors = Executors.newFixedThreadPool(5);

List<Observable<String>> obs = IntStream.range(0, 10)

.boxed()

.map(i -> generateTask(i, executors)).collect(Collectors.toList());

Observable<List<String>> merged = Observable.merge(obs).toList();

List<String> result = merged.toBlocking().first();

...

private Observable<String> generateTask(int i, ExecutorService executorService) {

return Observable

.<String>create(s -> {

Util.delay(2000);

s.onNext( i + "-test");

s.onCompleted();

}).subscribeOn(Schedulers.from(executorService));

}

Spring Reactor Coreにより

ExecutorService executors = Executors.newFixedThreadPool(5);

List<Flux<String>> fluxList = IntStream.range(0, 10)

.boxed()

.map(i -> generateTask(executors, i)).collect(Collectors.toList());

Mono<List<String>> merged = Flux.merge(fluxList).toList();

List<String> list = merged.get();

...

public Flux<String> generateTask(ExecutorService executorService, int i) {

return Flux.<String>create(s -> {

Util.delay(2000);

s.onNext(i + "-test");

s.onComplete();

}).subscribeOn(executorService);

}