Thread,wait,notify,Hashtableなどの代わりに
java.util.concurrent下のものを利用すべき。
・Executor Framework
・Concurrent Collection
・ConcurrentHashMapなど
・Synchronizer
・CountDownLatch 一つの大きいタスクを複数の小さいタスクに分けて、すべての小さいタスクが終わる次第に結果をまとめる 再利用不可
・Semaphore 実行中のスレッド数を制限
・CyclicBarrier すべてのスレッドをWaiting状態にさせて、ある条件に満ちたら、続く 再利用可(reset())
・Exchanger スレッド間にデータを交換
★RunnableとCallable
public interface Runnable {
void run(); ←デメリット:戻り値なし、例外なし
}
public interface Callable<V> {
V call() throws Exception;
}
〇Runnableサンプル
class Calculator implements Runnable{
@Override
public void run() {
// do something
}
}
ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(new Calculator());
es.shutdown();
〇Callableサンプル
class Calculator implements Callable<Integer>{
private int money;
public Calculator(int money) {
this.money = money;
}
@Override
public Integer call() throws Exception {
TimeUnit.MILLISECONDS.sleep(10000); // 10m
return this.money * 2;
}
}
ExecutorService es = Executors.newSingleThreadExecutor();
//方法1
Future<Integer> future = es.submit(new Calculator(100));
//方法2
FutureTask<Integer> future = new FutureTask<Integer>(new Calculator(100));
es.submit(future);
while(!future.isDone()){
TimeUnit.MILLISECONDS.sleep(200);
System.out.print("#");
}
Integer result = future.get(); // エラー時にExecutionException
//future.get(3, TimeUnit.SECONDS); // エラー時にTimeoutException
es.shutdown();
class Task implements Callable<Integer> { ... }
// 書き方1
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
out.println(result.get());
// 書き方2
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
executor.shutdown();
out.println(futureTask.get());
// 書き方3
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();
out.println(futureTask.get());
★Callableの利用例
@EJB
private Caller caller;
@Test
public void testXxx() throws Exception {
caller.call(new Callable<Object>() {
public Object call() throws Exception {
doSomething();
return null;
}
});
}
public static interface Caller {
public <V> V call(Callable<V> callable) throws Exception;
}
@Stateless
@TransactionAttribute(REQUIRES_NEW)
public static class TestBean implements Caller {
public <V> V call(Callable<V> callable) throws Exception {
return callable.call();
}
}
★CountDownLatch補足
用法1:一斉に終了
final int COUNT = 10;
final CountDownLatch completeLatch = new CountDownLatch(COUNT);
for (int i = 0; i < COUNT; ++i) {
Thread thread = new Thread("worker " + i) {
public void run() {
...
completeLatch.countDown();
}
};
thread.start();
}
...
completeLatch.await();
用法2:一斉に開始
final int COUNT = 10;
final CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 0; i < COUNT; ++i) {
Thread thread = new Thread("worker " + i) {
public void run() {
try {
startLatch.await();
} catch (InterruptedException e) {
return;
}
...
}
};
thread.start();
}
...
startLatch.countDown();
同じ処理
書き方1(一番簡単)
final CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread("worker") {
public void run() {
...
latch.countDown();
}
};
thread.start();
latch.await();
書き方2
final Object completeSignal = new Object();
Thread thread = new Thread("worker") {
public void run() {
...
synchronized (completeSignal) {
completeSignal.notifyAll();
}
}
};
synchronized (completeSignal) {
thread.start();
completeSignal.wait();
}
書き方3
final Lock lock = new ReentrantLock();
final Condition completeSignal = lock.newCondition();
Thread thread = new Thread("worker") {
public void run() {
...
lock.lock();
try {
completeSignal.signalAll();
} finally {
lock.unlock();
}
}
};
lock.lock();
try {
thread.start();
completeSignal.await();
} finally {
lock.unlock();
}
★CompletionService
完成したタスクの順番で
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService
= new ExecutorCompletionService<Integer>(newFixedThreadPool);
for(int i = 0; i <= 10; i++){
final int task = i;
completionService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(3000)); //3秒まで
return task;
}
});
}
for(int i = 0; i <= 10; i++){
out.println("Task completed:" + completionService.take().get());
}
★Thread Safeなコレクション
・ConcurrentMap (Interface)
putIfAbsent(K key, V value) : V
remove(Object key, Object value) : boolean
replace(K key, V value) : V
replace(K key, V oldValue, V newValue) : boolean
・ConcurrentHashMap
・ConcurrentSkipListMap
・ConcurrentSkipListSet
・CopyOnWriteArrayList
元のリストのコピーを作成
・CopyOnWriteArraySet
元のセットのコピーを作成
getメソッドを含む読み込み処理ではオブジェクトがロックされない
putやremoveメソッドを含む書き込み処理ではオブジェクトがロックされる
public class Sample implements Runnable{
private ConcurrentHashMap<String, String> map;
public Sample(ConcurrentHashMap<String, String> map) {
this.map = map;
}
@Override
public void run() {
while(!map.isEmpty()){
for (String key : map.keySet()) {
String value = map.get(key);
if(map.containsKey(key)){
value = map.remove(key);
}
}
}
}
}
// 利用側
ConcurrentHashMap<String, String> map
= new ConcurrentHashMap<String, String>();
map.put("key1", "100");
map.put("key2", "200");
Sample sample1 = new Sample(map);
Sample sample2 = new Sample(map);
Thread thread1 = new Thread(sample1);
Thread thread2 = new Thread(sample2);
thread1.start();
thread2.start();
HashMapとConcurrentHashMapの区別
XXX<Integer, String> m1 = new XXX<Integer, String>();
m1.put(1, "001");
m1.put(2, "002");
for(Entry<Integer, String> entry : m1.entrySet()){
System.out.println("key:" + entry.getKey());
}
・XXXがHashMapの場合 出力はkey:1 key:2 出力順番は最初⇒最後
・XXXがConcurrentHashMapの場合 出力はkey:2 key:1 出力順番は最後⇒最初
ConcurrentHashMap
サンプル1
private Map<String, Object> map = new HashMap<>();
public Object getBean(String key) {
synchronized (map) {
Object bean = map.get(key);
if (bean == null) {
map.put(key, createBean());
bean = map.get(key);
}
return bean;
}
}
↓
private ConcurrentMap<String, Object> map = new ConcurrentHashMap<>();
public Object getBean(String key) {
Object bean = map.get(key);
if (bean == null) {
map.putIfAbsent(key, createBean());
bean = map.get(key);
}
return bean;
}
サンプル2
public class Fibonacci {
public int fib(int n) {
if (n == 0 || n == 1) return n;
return fib(n - 2) + fib(n - 1);
}
}
↓
public class Fibonacci {
private Map<Integer, Integer> cache = new ConcurrentHashMap<>();
public int fib(int n) {
if (n == 0 || n == 1) return n;
Integer result = cache.get(n);
if (result == null) {
synchronized (cache) {
result = cache.get(n);
if (result == null) {
result = fib(n - 2) + fib(n - 1);
cache.put(n, result);
}
}
}
return result;
}
}
↓
public class Fibonacci {
private Map<Integer, Integer> cache = new ConcurrentHashMap<>();
public int fib(int n) {
if (n == 0 || n == 1) return n;
return cache.computeIfAbsent(n, (key) -> fib(n - 2) + fib(n - 1));
}
}
CopyOnWriteArrayList
private List<Listener> listeners = new ArrayList<Listener>();
public boolean addListener(Listener listener) {
synchronized (listeners) {
return listeners.add(listener);
}
}
public void execute() {
synchronized (listeners) {
for (Listener listener : listeners) {
listener.handle();
}
}
}
↓
private List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
public boolean addListener(Listener listener) {
return listeners.add(listener);
}
public void execute() {
for (Listener listener : listeners) {
listener.handle();
}
}
CopyOnWriteArraySet
public class StateHolder {
private final Set<StateListener> listeners = new HashSet<>();
private int state;
public void addStateListener(StateListener listener) {
synchronized (listeners) {
listeners.add(listener);
}
}
public void removeStateListener(StateListener listener) {
synchronized (listeners) {
listeners.remove(listener);
}
}
public int getState() {
synchronized (listeners) {
return state;
}
}
public void setState(int state) {
int oldState = this.state;
synchronized (listeners) {
this.state = state;
}
if (oldState != state) {
broadcast(new StateEvent(oldState, state));
}
}
private void broadcast(StateEvent stateEvent) {
Set<StateListener> snapshot;
synchronized (listeners) {
// Avoid ConcurrentModificationException
snapshot = new HashSet<>(listeners);
}
for (StateListener listener : snapshot) {
listener.stateChanged(stateEvent);
}
}
}
↓
public class StateHolder {
private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();
private final AtomicInteger state = new AtomicInteger();
public void addStateListener(StateListener listener) {
listeners.add(listener);
}
public void removeStateListener(StateListener listener) {
listeners.remove(listener);
}
public int getState() {
return state.get();
}
public void setState(int state) {
int oldState = this.state.getAndSet(state);
if (oldState != state) {
broadcast(new StateEvent(oldState, state));
}
}
private void broadcast(StateEvent stateEvent) {
for (StateListener listener : listeners) {
listener.stateChanged(stateEvent);
}
// listeners.forEach(listener -> listener.stateChanged(stateEvent));
}
}