Concurrent

Thread,wait,notify,Hashtableなどの代わりに

java.util.concurrent下のものを利用すべき。

・Executor Framework

・Concurrent Collection

・ConcurrentHashMapなど

・Synchronizer

・CountDownLatch 一つの大きいタスクを複数の小さいタスクに分けて、すべての小さいタスクが終わる次第に結果をまとめる 再利用不可

・Semaphore 実行中のスレッド数を制限

・CyclicBarrier すべてのスレッドをWaiting状態にさせて、ある条件に満ちたら、続く 再利用可(reset())

・Exchanger スレッド間にデータを交換

CyclicBarrier CountDownLatch Semaphore

Synchronizer demo

java.util.concurrentパッケージまとめ

※java.util.concurrentパッケージのcollectionはジェネリックを参照。

★RunnableとCallable

public interface Runnable {

void run(); ←デメリット:戻り値なし、例外なし

}

public interface Callable<V> {

V call() throws Exception;

}

〇Runnableサンプル

class Calculator implements Runnable{

@Override

public void run() {

// do something

}

}

ExecutorService es = Executors.newSingleThreadExecutor();

es.submit(new Calculator());

es.shutdown();

〇Callableサンプル

class Calculator implements Callable<Integer>{

private int money;

public Calculator(int money) {

this.money = money;

}

@Override

public Integer call() throws Exception {

TimeUnit.MILLISECONDS.sleep(10000); // 10m

return this.money * 2;

}

}

ExecutorService es = Executors.newSingleThreadExecutor();

//方法1

Future<Integer> future = es.submit(new Calculator(100));

//方法2

FutureTask<Integer> future = new FutureTask<Integer>(new Calculator(100));

es.submit(future);

while(!future.isDone()){

TimeUnit.MILLISECONDS.sleep(200);

System.out.print("#");

}

Integer result = future.get(); // エラー時にExecutionException

//future.get(3, TimeUnit.SECONDS); // エラー時にTimeoutException

es.shutdown();

class Task implements Callable<Integer> { ... }

// 書き方1

ExecutorService executor = Executors.newCachedThreadPool();

Task task = new Task();

Future<Integer> result = executor.submit(task);

executor.shutdown();

out.println(result.get());

// 書き方2

ExecutorService executor = Executors.newCachedThreadPool();

Task task = new Task();

FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

executor.submit(futureTask);

executor.shutdown();

out.println(futureTask.get());

// 書き方3

Task task = new Task();

FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

Thread thread = new Thread(futureTask);

thread.start();

out.println(futureTask.get());

★Callableの利用例

@EJB

private Caller caller;

@Test

public void testXxx() throws Exception {

caller.call(new Callable<Object>() {

public Object call() throws Exception {

doSomething();

return null;

}

});

}

public static interface Caller {

public <V> V call(Callable<V> callable) throws Exception;

}

@Stateless

@TransactionAttribute(REQUIRES_NEW)

public static class TestBean implements Caller {

public <V> V call(Callable<V> callable) throws Exception {

return callable.call();

}

}

★CountDownLatch補足

用法1:一斉に終了

final int COUNT = 10;

final CountDownLatch completeLatch = new CountDownLatch(COUNT);

for (int i = 0; i < COUNT; ++i) {

Thread thread = new Thread("worker " + i) {

public void run() {

...

completeLatch.countDown();

}

};

thread.start();

}

...

completeLatch.await();

用法2:一斉に開始

final int COUNT = 10;

final CountDownLatch startLatch = new CountDownLatch(1);

for (int i = 0; i < COUNT; ++i) {

Thread thread = new Thread("worker " + i) {

public void run() {

try {

startLatch.await();

} catch (InterruptedException e) {

return;

}

...

}

};

thread.start();

}

...

startLatch.countDown();

同じ処理

書き方1(一番簡単)

final CountDownLatch latch = new CountDownLatch(1);

Thread thread = new Thread("worker") {

public void run() {

...

latch.countDown();

}

};

thread.start();

latch.await();

書き方2

final Object completeSignal = new Object();

Thread thread = new Thread("worker") {

public void run() {

...

synchronized (completeSignal) {

completeSignal.notifyAll();

}

}

};

synchronized (completeSignal) {

thread.start();

completeSignal.wait();

}

書き方3

final Lock lock = new ReentrantLock();

final Condition completeSignal = lock.newCondition();

Thread thread = new Thread("worker") {

public void run() {

...

lock.lock();

try {

completeSignal.signalAll();

} finally {

lock.unlock();

}

}

};

lock.lock();

try {

thread.start();

completeSignal.await();

} finally {

lock.unlock();

}

★CompletionService

完成したタスクの順番で

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);

CompletionService<Integer> completionService

= new ExecutorCompletionService<Integer>(newFixedThreadPool);

for(int i = 0; i <= 10; i++){

final int task = i;

completionService.submit(new Callable<Integer>() {

public Integer call() throws Exception {

Thread.sleep(new Random().nextInt(3000)); //3秒まで

return task;

}

});

}

for(int i = 0; i <= 10; i++){

out.println("Task completed:" + completionService.take().get());

}

★Thread Safeなコレクション

・ConcurrentMap (Interface)

putIfAbsent(K key, V value) : V

remove(Object key, Object value) : boolean

replace(K key, V value) : V

replace(K key, V oldValue, V newValue) : boolean

・ConcurrentHashMap

・ConcurrentSkipListMap

・ConcurrentSkipListSet

・CopyOnWriteArrayList

元のリストのコピーを作成

・CopyOnWriteArraySet

元のセットのコピーを作成

getメソッドを含む読み込み処理ではオブジェクトがロックされない

putやremoveメソッドを含む書き込み処理ではオブジェクトがロックされる

public class Sample implements Runnable{

private ConcurrentHashMap<String, String> map;

public Sample(ConcurrentHashMap<String, String> map) {

this.map = map;

}

@Override

public void run() {

while(!map.isEmpty()){

for (String key : map.keySet()) {

String value = map.get(key);

if(map.containsKey(key)){

value = map.remove(key);

}

}

}

}

}

// 利用側

ConcurrentHashMap<String, String> map

= new ConcurrentHashMap<String, String>();

map.put("key1", "100");

map.put("key2", "200");

Sample sample1 = new Sample(map);

Sample sample2 = new Sample(map);

Thread thread1 = new Thread(sample1);

Thread thread2 = new Thread(sample2);

thread1.start();

thread2.start();

HashMapとConcurrentHashMapの区別

XXX<Integer, String> m1 = new XXX<Integer, String>();

m1.put(1, "001");

m1.put(2, "002");

for(Entry<Integer, String> entry : m1.entrySet()){

System.out.println("key:" + entry.getKey());

}

・XXXがHashMapの場合 出力はkey:1 key:2 出力順番は最初⇒最後

・XXXがConcurrentHashMapの場合 出力はkey:2 key:1 出力順番は最後⇒最初

ConcurrentHashMap

サンプル1

private Map<String, Object> map = new HashMap<>();

public Object getBean(String key) {

synchronized (map) {

Object bean = map.get(key);

if (bean == null) {

map.put(key, createBean());

bean = map.get(key);

}

return bean;

}

}

private ConcurrentMap<String, Object> map = new ConcurrentHashMap<>();

public Object getBean(String key) {

Object bean = map.get(key);

if (bean == null) {

map.putIfAbsent(key, createBean());

bean = map.get(key);

}

return bean;

}

サンプル2

public class Fibonacci {

public int fib(int n) {

if (n == 0 || n == 1) return n;

return fib(n - 2) + fib(n - 1);

}

}

public class Fibonacci {

private Map<Integer, Integer> cache = new ConcurrentHashMap<>();

public int fib(int n) {

if (n == 0 || n == 1) return n;

Integer result = cache.get(n);

if (result == null) {

synchronized (cache) {

result = cache.get(n);

if (result == null) {

result = fib(n - 2) + fib(n - 1);

cache.put(n, result);

}

}

}

return result;

}

}

public class Fibonacci {

private Map<Integer, Integer> cache = new ConcurrentHashMap<>();

public int fib(int n) {

if (n == 0 || n == 1) return n;

return cache.computeIfAbsent(n, (key) -> fib(n - 2) + fib(n - 1));

}

}

CopyOnWriteArrayList

private List<Listener> listeners = new ArrayList<Listener>();

public boolean addListener(Listener listener) {

synchronized (listeners) {

return listeners.add(listener);

}

}

public void execute() {

synchronized (listeners) {

for (Listener listener : listeners) {

listener.handle();

}

}

}

private List<Listener> listeners = new CopyOnWriteArrayList<Listener>();

public boolean addListener(Listener listener) {

return listeners.add(listener);

}

public void execute() {

for (Listener listener : listeners) {

listener.handle();

}

}

CopyOnWriteArraySet

public class StateHolder {

private final Set<StateListener> listeners = new HashSet<>();

private int state;

public void addStateListener(StateListener listener) {

synchronized (listeners) {

listeners.add(listener);

}

}

public void removeStateListener(StateListener listener) {

synchronized (listeners) {

listeners.remove(listener);

}

}

public int getState() {

synchronized (listeners) {

return state;

}

}

public void setState(int state) {

int oldState = this.state;

synchronized (listeners) {

this.state = state;

}

if (oldState != state) {

broadcast(new StateEvent(oldState, state));

}

}

private void broadcast(StateEvent stateEvent) {

Set<StateListener> snapshot;

synchronized (listeners) {

// Avoid ConcurrentModificationException

snapshot = new HashSet<>(listeners);

}

for (StateListener listener : snapshot) {

listener.stateChanged(stateEvent);

}

}

}

public class StateHolder {

private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();

private final AtomicInteger state = new AtomicInteger();

public void addStateListener(StateListener listener) {

listeners.add(listener);

}

public void removeStateListener(StateListener listener) {

listeners.remove(listener);

}

public int getState() {

return state.get();

}

public void setState(int state) {

int oldState = this.state.getAndSet(state);

if (oldState != state) {

broadcast(new StateEvent(oldState, state));

}

}

private void broadcast(StateEvent stateEvent) {

for (StateListener listener : listeners) {

listener.stateChanged(stateEvent);

}

// listeners.forEach(listener -> listener.stateChanged(stateEvent));

}

}