スレッド同期
Blocking Synchronization (mutex、semaphore)
問題:deadlock、livelock、priority inversion、performanceなど
Non-blocking Synchronization
・Wait-free
・Lock-free
・Obstruction-free
←複雑 簡単→
Wait-free Lock-free Obstruction-free Atomic Lockless-based Lock-based
←細粒度のロック 粗粒度のロック→
Protecting shared data
Synchronized Lock ReadWriteLock volatile Atomic-classes ThreadLocal
Atomic + Atomic != Atomic
★synchronizedの基本
1つのオブジェクトを複数のスレッドで使用する場合は使う必要がある
public class Target{
// メソッド単位
public synchronized void method(){
// do something
}
public void method(){
// ブロック単位
synchronized(this){
// do something
}
}
}
public class Sample implements Runnable{
private Target target;
public Sample(Target target) {
this.target = target;
}
@Override
public void run() {
this.target.method();
}
}
// 利用側
Target target = new Target();
Sample sample1 = new Sample(target);
Sample sample2 = new Sample(target);
Thread thread1 = new Thread(sample1);
Thread thread2 = new Thread(sample2);
thread1.start();
thread2.start();
★クラスをスレッドセーフ化
方法1 メソッドの同期
class Dummy {
private boolean flag = true;
public synchronized boolean isFlag() {
return flag;
}
public synchronized void setFlag(boolean flag) {
this.flag = flag;
}
}
方法2 volatile変数の読み取り、同期書き込み
class Dummy {
private volatile boolean flag = true;
public boolean isFlag() {
return flag;
}
public synchronized void setFlag(boolean flag) {
this.flag = flag;
}
}
方法3 ReadWriteLock
class Dummy {
private boolean flag = true;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock(); // multi thread
private final Lock writeLock = lock.writeLock(); // only one thread
public boolean isFlag() {
readLock.lock();
try {
return flag;
} finally {
readLock.unlock();
}
}
public void setFlag(boolean flag) {
writeLock.lock();
try {
this.flag = flag;
} finally {
writeLock.unlock();
}
}
}
方法4 AtomicXXX
class Dummy {
private AtomicBoolean flag = new AtomicBoolean(true);
public AtomicBoolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
boolean temp;
do {
temp = this.flag.get();
} while (!this.flag.compareAndSet(temp, flag));
}
}
※効率上からいえば、一般的には、
・書き込み処理
←速い 遅い→
修飾子なし > volatile > AtomicXXX >> synchronized > ReentrantReadWriteLock
・読み込み処理
←速い 遅い→
修飾子なし > volatile ≒ AtomicXXX >> ReentrantReadWriteLock > synchronized
★ロックオブジェクトについて
Object level locking
make non-static data thread safe
public synchronized void syncMethod(){...}
public void syncMethod(){
synchronized (this) {...}
}
private final Object lock = new Object();
public void syncMethod(){
synchronized (lock) {...}
}
方法1
private final Integer lock = 10; ×
private final Integer lock = new Integer(10); ○
private final String lock = new String("LOCK").intern(); ×
private final String lock = "LOCK"; ×
private final String lock = new String("LOCK"); ○
private final Object lock = new Object(); ◎
public void method() {
synchronized(lock){
...
}
}
方法2
private final Lock lock = new ReentrantLock();
public void method() {
lock.lock();
try{
...
}finally{
lock.unlock();
}
}
Class level locking
make static data thread safe
public synchronized static void syncMethod(){...}
public void syncMethod(){
synchronized (SyncClass.class) {...}
}
private final static Object lock = new Object();
public void syncMethod(){
synchronized (lock) {...}
}
※共有staticデータの保護にstaticロックオブジェクトを使う
理由:複数のスレッドが開始すると、ロックオブジェクトのインスタンスも複数生成される
class MyThread implements Runnable {
private static int counter;
private static final Object lock = new Object();
public void run() {
synchronized(lock){
counter++;
...
}
}
}
★lock vs synchronized
Synchronized = Lock + Condition
lock
競争がある際、待つ状態
スレッドに対し、公平・非公平を設定できる
コードレベル
自由、機能強
ReadWriteLockを使うと、もっと細かく操作できる
synchronized
競争がある際、塞がる状態
スレッドに対し、非公平のみ
JVM(Runtime)レベル
便利、安全
★volatileについて
volatileをつけないと、あるスレッドに変数が変わっても、別スレッドにすぐ反映されるのを保証できない。
volatileはスレッドセーフを保証できず、主メモリから最新状態を取得するだけ。
1.State flag
public class StopThread {
private static volatile boolean stopRequested = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(!stopRequested) {...};
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
2.簡易のカウンター
private volatile int count = 0;
public synchronized void increment() {
count++;
}
public int getCount() {
return count;
}
注意:++処理は原子操作ではないため、マルチスレッド環境に重複の値が生成されるかも。
改善版(synchronizedより性能がいい)
private AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet(); //getAndIncrement()もある
}
public int getCount() {
return count.get();
}
3.その他
private volatile IoBufferAllocator instance;
public IoBufferAllocator getInsntace(){
if(instance==null){
synchronized (IoBufferAllocator.class) {
if(instance==null)
instance=new IoBufferAllocator();
}
}
return instance;
}
★ThreadLocalについて
1スレッドに対して1オブジェクトだけを持つ仕組
public class Test {
private static ThreadLocal<Integer> tl = new ThreadLocal<Integer>();
public static void main(String[] args) {
for(int i = 0; i < 2; i++){
new Thread( new Runnable(){
@Override
public void run() {
int data = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + " set:" + data);
// 実行中のスレッドにデータを入れる
tl.set(data); // 簡単な対象
Target target = Target.getThreadInstance(); // 複雑な対象
target.setData(data);
get();
}
}).start();
}
}
public static void get(){
int data = tl.get();
System.out.println(Thread.currentThread().getName() + " get:" + data);
Target target = Target.getThreadInstance();
System.out.println(Thread.currentThread().getName() + " getData : " + target.getData());
}
}
class Target {
private Target(){}
// synchronizedが必要なし
public static /*synchronized*/ Target getThreadInstance(){
Target instance = map.get();
if(instance == null){
instance = new Target();
map.set(instance);
}
return instance;
}
private static ThreadLocal<Target> map = new ThreadLocal<Target>();
private int data;
// getData/setData
}
利用例1
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<>() {
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};
public static Connection getConnection() {
return connectionHolder.get();
}
利用例2
private static final ThreadLocal<Session> threadSession = new ThreadLocal<>();
public static Session getSession() throws InfrastructureException {
Session session = threadSession.get();
try {
if (session == null) {
session = getSessionFactory().openSession();
threadSession.set(session);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return session;
}
synchronized
private Object notEmpty = new Object();
private Object notFull = new Object();
private Queue<Object> linkedList = new LinkedList<Object>();
private int maxLength = 10;
public Object take() throws InterruptedException {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.wait();
}
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.notifyAll();
}
return linkedList.poll();
}
}
}
public void offer(Object object) throws InterruptedException {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.notifyAll();
}
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.wait();
}
linkedList.add(object);
}
}
}
lock
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
private Queue<Object> linkedList = new LinkedList<Object>();
private int maxLength = 10;
public Object take() throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
notEmpty.await();
}
if (linkedList.size() == maxLength) {
notFull.signalAll();
}
return linkedList.poll();
} finally {
lock.unlock();
}
}
public void offer(Object object) throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
notEmpty.signalAll();
}
if (linkedList.size() == maxLength) {
notFull.await();
}
linkedList.add(object);
} finally {
lock.unlock();
}
}
wait / notify
private final Object lock = new Object();
private volatile boolean flag = false;
public void waitTillChange() {
synchronized(lock) {
while(!flag)
try {
lock.wait();
} catch(InterruptedException e) {...}
}
}
public void change() {
synchronized(lock) {
flag = true;
lock.notifyAll();
}
}
Condition
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean flag = false;
public void waitTillChange() {
lock.lock();
try {
while(!flag) {
condition.await();
}
} finally {
lock.unlock();
}
}
public void change() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
★Lock-Free = loop + CAS + break
ロックを避けるため、CASを持つデータ構造を利用すべき
・AtomicXXX、ConcurrentMap、CopyOnWriteList、ConcurrentLinkedQueueなど
private volatile int max = 0;
public synchronized void set(int value) {
if (value > max) {
max = value;
}
}
public int getMax() {
return max;
}
↓
private AtomicInteger max = new AtomicInteger();
// 書き方1
public void set(int value) {
for (;;) {
int current = max.get();
if (value > current) {
if (max.compareAndSet(current, value)) {
break;
} else {
continue;
}
} else {
break;
}
}
}
// 書き方2
public void set(int value) {
int current;
do {
current = max.get();
if (value <= current) {
break;
}
} while (!max.compareAndSet(current, value));
}
public int getMax() {
return max.get();
}
★Deadlock avoidance
1.リソース共用を避ける
2.Spin Lock
try{
if(lock.tryLock(2, TimeUnit.SECONDS)){
// do something
}
}catch(InterruptedException e){
}finally{
lock.unlock();
}
3.JDK 7のPhaser
4.Lock splitting
5.Lock ordering
6.Lock timeout
★各種ロックの性能テスト
※テスト結果は添付ファイルを参照
利用例3
public class Task implements Runnable {
private static final AtomicInteger nextId = new AtomicInteger(0);
private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextId.getAndIncrement();
}
};
public int getThreadId() {
return threadId.get();
}
private static final ThreadLocal<Date> startDate = new ThreadLocal<Date>() {
protected Date initialValue() {
return new Date();
}
};
@Override
public void run() {
out.printf("Thread started: %s : %s\n", getThreadId(), startDate.get());
// ...
out.printf("Thread ended: %s : %s\n", getThreadId(), startDate.get());
}
}