21 コンパイル型‎ > ‎Java‎ > ‎JavaSE‎ > ‎

Concurrent

Thread,wait,notify,Hashtableなどの代わりに
java.util.concurrent下のものを利用すべき。
・Executor Framework
・Concurrent Collection
    ・ConcurrentHashMapなど
・Synchronizer
    ・CountDownLatch    一つの大きいタスクを複数の小さいタスクに分けて、すべての小さいタスクが終わる次第に結果をまとめる           再利用不可
    ・Semaphore            実行中のスレッド数を制限
    ・CyclicBarrier         すべてのスレッドをWaiting状態にさせて、ある条件に満ちたら、続く            再利用可(reset())
    ・Exchanger            スレッド間にデータを交換

                            CyclicBarrier                                                                                   CountDownLatch                                                                        Semaphore

Synchronizer demo

java.util.concurrentパッケージまとめ

※java.util.concurrentパッケージのcollectionはジェネリックを参照。

★RunnableとCallable
public interface Runnable {
    void run();        ←デメリット:戻り値なし、例外なし
}
public interface Callable<V> {
    V call() throws Exception;
}

〇Runnableサンプル
class Calculator implements Runnable{
    @Override
    public void run() {
        // do something
    }
}

ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(new Calculator());
es.shutdown();

〇Callableサンプル
class Calculator implements Callable<Integer>{
    private int money;
    public Calculator(int money) {
        this.money = money;
    }

    @Override
    public Integer call() throws Exception {
        TimeUnit.MILLISECONDS.sleep(10000);    // 10m
        return this.money * 2;
    }
}

ExecutorService es = Executors.newSingleThreadExecutor();

//方法1
Future<Integer> future = es.submit(new Calculator(100));

//方法2
FutureTask<Integer> future = new FutureTask<Integer>(new Calculator(100));
es.submit(future);

while(!future.isDone()){
    TimeUnit.MILLISECONDS.sleep(200);
    System.out.print("#");
}

Integer result = future.get();      // エラー時にExecutionException
//future.get(3, TimeUnit.SECONDS);  // エラー時にTimeoutException

es.shutdown();

class Task implements Callable<Integer> { ... }

// 書き方1
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
out.println(result.get());

// 書き方2
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
executor.shutdown();
out.println(futureTask.get());
 
// 書き方3
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();
out.println(futureTask.get());

★Callableの利用例
@EJB
private Caller caller;

@Test
public void testXxx() throws Exception {
    caller.call(new Callable<Object>() {
        public Object call() throws Exception {
            doSomething();
            return null;
        }
    });
}

public static interface Caller {
    public <V> V call(Callable<V> callable) throws Exception;
}

@Stateless
@TransactionAttribute(REQUIRES_NEW)
public static class TestBean implements Caller {
    public <V> V call(Callable<V> callable) throws Exception {
        return callable.call();
    }
}

★CountDownLatch補足
用法1:一斉に終了
final int COUNT = 10;
final CountDownLatch completeLatch = new CountDownLatch(COUNT);

for (int i = 0; i < COUNT; ++i) {
    Thread thread = new Thread("worker " + i) {
        public void run() {
            ...
            completeLatch.countDown();
        }
    };
    thread.start();
}
...
completeLatch.await();

用法2:一斉に開始
final int COUNT = 10;
final CountDownLatch startLatch = new CountDownLatch(1);

for (int i = 0; i < COUNT++i) {
    Thread thread = new Thread("worker " + i) {
        public void run() {
            try {
                startLatch.await();
            } catch (InterruptedException e) {
                return;
            }
            ...
        }
    };
    thread.start();
}
...
startLatch.countDown();

同じ処理
書き方1(一番簡単)
final CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread("worker") {
    public void run() {
        ...
        latch.countDown();
    }
};
thread.start();
latch.await();
書き方2
final Object completeSignal = new Object();
Thread thread = new Thread("worker") {
    public void run() {
        ...
        synchronized (completeSignal) {
            completeSignal.notifyAll();
        }
    }
};

synchronized (completeSignal) {
    thread.start();
    completeSignal.wait();
}
書き方3
final Lock lock = new ReentrantLock();
final Condition completeSignal = lock.newCondition();
Thread thread = new Thread("worker") {
    public void run() {
        ...
        lock.lock();
        try {
            completeSignal.signalAll();
        } finally {
            lock.unlock();
        }
    }
};

lock.lock();
try {
    thread.start();
    completeSignal.await();
} finally {
    lock.unlock();
}

★CompletionService
完成したタスクの順番で
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService 
    = new ExecutorCompletionService<Integer>(newFixedThreadPool);

for(int i = 0; i <= 10; i++){
    final int task = i;
    completionService.submit(new Callable<Integer>() {
        public Integer call() throws Exception {
            Thread.sleep(new Random().nextInt(3000)); //3秒まで
            return task;
        }
    });
}

for(int i = 0; i <= 10; i++){
    out.println("Task completed:" + completionService.take().get());
}


★Thread Safeなコレクション
 Thread Safeless Thread Safe
 ArrayList CopyOnWriteArrayList
 HashSet CopyOnWriteArraySet
 HashMap ConcurrentHashMap
 TreeMap ConcurrentSkipListMap
 TreeSet ConcurrentSkipListSet

・ConcurrentMap (Interface)
    putIfAbsent(K key, V value) : V
    remove(Object key, Object value) : boolean
    replace(K key, V value) : V
    replace(K key, V oldValue, V newValue) : boolean
・ConcurrentHashMap
・ConcurrentSkipListMap
・ConcurrentSkipListSet
・CopyOnWriteArrayList
    元のリストのコピーを作成
・CopyOnWriteArraySet
    元のセットのコピーを作成
getメソッドを含む読み込み処理ではオブジェクトがロックされない
putやremoveメソッドを含む書き込み処理ではオブジェクトがロックされる

public class Sample implements Runnable{
    private ConcurrentHashMap<String, String> map;

    public Sample(ConcurrentHashMap<String, String> map) {
        this.map = map;
    }

    @Override
    public void run() {
        while(!map.isEmpty()){
            for (String key : map.keySet()) {
                String value = map.get(key);

                if(map.containsKey(key)){
                    value = map.remove(key);
                }
            }
        }
    }
}

// 利用側
ConcurrentHashMap<String, String> map
    = new ConcurrentHashMap<String, String>();

map.put("key1", "100");
map.put("key2", "200");

Sample sample1 = new Sample(map);
Sample sample2 = new Sample(map);

Thread thread1 = new Thread(sample1);
Thread thread2 = new Thread(sample2);

thread1.start();
thread2.start();

HashMapとConcurrentHashMapの区別
XXX<Integer, String> m1 = new XXX<Integer, String>();  
m1.put(1, "001");  
m1.put(2, "002");  
for(Entry<Integer, String> entry : m1.entrySet()){  
    System.out.println("key:" + entry.getKey());  
・XXXがHashMapの場合                    出力はkey:1 key:2        出力順番は最初⇒最後
・XXXがConcurrentHashMapの場合    出力はkey:2 key:1        出力順番は最後⇒最初

ConcurrentHashMap
サンプル1
private Map<String, Object> map = new HashMap<>();
public Object getBean(String key) {
    synchronized (map) {
        Object bean = map.get(key);
        if (bean == null) {
            map.put(key, createBean());
            bean = map.get(key);
        }
        return bean;
    }
}
    ↓
private ConcurrentMap<String, Object> map = new ConcurrentHashMap<>();
public Object getBean(String key) {
    Object bean = map.get(key);
    if (bean == null) {
        map.putIfAbsent(key, createBean());
        bean = map.get(key);
    }
    return bean;
}

サンプル2
public class Fibonacci {
    public int fib(int n) {
        if (n == 0 || n == 1) return n;

        return fib(n - 2) + fib(n - 1);
    }
}
    ↓
public class Fibonacci {

    private Map<Integer, Integer> cache = new ConcurrentHashMap<>();

    public int fib(int n) {
        if (n == 0 || n == 1) return n;

        Integer result = cache.get(n);

        if (result == null) {
            synchronized (cache) {
                result = cache.get(n);

                if (result == null) {
                    result = fib(n - 2) + fib(n - 1);
                    cache.put(n, result);
                }
            }
        }

        return result;
    }
}
    ↓
public class Fibonacci {

    private Map<Integer, Integer> cache = new ConcurrentHashMap<>();

    public int fib(int n) {
        if (n == 0 || n == 1) return n;

        return cache.computeIfAbsent(n, (key) -> fib(n - 2) + fib(n - 1));
    }
}

CopyOnWriteArrayList
private List<Listener> listeners = new ArrayList<Listener>();

public boolean addListener(Listener listener) {
    synchronized (listeners) {
        return listeners.add(listener);
    }
}
public void execute() {
    synchronized (listeners) {
        for (Listener listener : listeners) {
            listener.handle();
        }
    }
}
    ↓
private List<Listener> listeners = new CopyOnWriteArrayList<Listener>();

public boolean addListener(Listener listener) {
    return listeners.add(listener);
}
public void execute() {
    for (Listener listener : listeners) {
        listener.handle();
    }
}

CopyOnWriteArraySet
public class StateHolder {
 
    private final Set<StateListener> listeners = new HashSet<>();
    private int state;

    public void addStateListener(StateListener listener) {
        synchronized (listeners) {
            listeners.add(listener);
        }
    }

    public void removeStateListener(StateListener listener) {
        synchronized (listeners) {
            listeners.remove(listener);
        }
    }

    public int getState() {
        synchronized (listeners) {
            return state;
        }
    }

    public void setState(int state) {
        int oldState = this.state;
        synchronized (listeners) {
            this.state = state;
        }
        if (oldState != state) {
            broadcast(new StateEvent(oldState, state));
        }
    }

    private void broadcast(StateEvent stateEvent) {
        Set<StateListener> snapshot;
        synchronized (listeners) {
            // Avoid ConcurrentModificationException
            snapshot = new HashSet<>(listeners);
        }
        for (StateListener listener : snapshot) {
            listener.stateChanged(stateEvent);
        }
    }
}
    ↓
public class StateHolder {
 
    private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();
    private final AtomicInteger state = new AtomicInteger();

    public void addStateListener(StateListener listener) {
        listeners.add(listener);
    }

    public void removeStateListener(StateListener listener) {
        listeners.remove(listener);
    }

    public int getState() {
        return state.get();
    }

    public void setState(int state) {
        int oldState = this.state.getAndSet(state);
        if (oldState != state) {
            broadcast(new StateEvent(oldState, state));
        }
    }

    private void broadcast(StateEvent stateEvent) {
        for (StateListener listener : listeners) {
            listener.stateChanged(stateEvent);
        }
        // listeners.forEach(listener -> listener.stateChanged(stateEvent));
    }
}