スレッド同期

Blocking Synchronization (mutex、semaphore)

問題:deadlock、livelock、priority inversion、performanceなど

Non-blocking Synchronization

・Wait-free

・Lock-free

・Obstruction-free

←複雑 簡単→

Wait-free Lock-free Obstruction-free Atomic Lockless-based Lock-based

←細粒度のロック 粗粒度のロック→

Protecting shared data

Synchronized Lock ReadWriteLock volatile Atomic-classes ThreadLocal

Atomic + Atomic != Atomic

★synchronizedの基本

1つのオブジェクトを複数のスレッドで使用する場合は使う必要がある

public class Target{

// メソッド単位

public synchronized void method(){

// do something

}

public void method(){

// ブロック単位

synchronized(this){

// do something

}

}

}

public class Sample implements Runnable{

private Target target;

public Sample(Target target) {

this.target = target;

}

@Override

public void run() {

this.target.method();

}

}

// 利用側

Target target = new Target();

Sample sample1 = new Sample(target);

Sample sample2 = new Sample(target);

Thread thread1 = new Thread(sample1);

Thread thread2 = new Thread(sample2);

thread1.start();

thread2.start();

★クラスをスレッドセーフ化

方法1 メソッドの同期

class Dummy {

private boolean flag = true;

public synchronized boolean isFlag() {

return flag;

}

public synchronized void setFlag(boolean flag) {

this.flag = flag;

}

}

方法2 volatile変数の読み取り、同期書き込み

class Dummy {

private volatile boolean flag = true;

public boolean isFlag() {

return flag;

}

public synchronized void setFlag(boolean flag) {

this.flag = flag;

}

}

方法3 ReadWriteLock

class Dummy {

private boolean flag = true;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final Lock readLock = lock.readLock(); // multi thread

private final Lock writeLock = lock.writeLock(); // only one thread

public boolean isFlag() {

readLock.lock();

try {

return flag;

} finally {

readLock.unlock();

}

}

public void setFlag(boolean flag) {

writeLock.lock();

try {

this.flag = flag;

} finally {

writeLock.unlock();

}

}

}

方法4 AtomicXXX

class Dummy {

private AtomicBoolean flag = new AtomicBoolean(true);

public AtomicBoolean isFlag() {

return flag;

}

public void setFlag(boolean flag) {

boolean temp;

do {

temp = this.flag.get();

} while (!this.flag.compareAndSet(temp, flag));

}

}

※効率上からいえば、一般的には、

・書き込み処理

←速い 遅い→

修飾子なし > volatile > AtomicXXX >> synchronized > ReentrantReadWriteLock

・読み込み処理

←速い 遅い→

修飾子なし > volatile ≒ AtomicXXX >> ReentrantReadWriteLock > synchronized

★ロックオブジェクトについて

Object level locking

make non-static data thread safe

public synchronized void syncMethod(){...}

public void syncMethod(){

synchronized (this) {...}

}

private final Object lock = new Object();

public void syncMethod(){

synchronized (lock) {...}

}

方法1

private final Integer lock = 10; ×

private final Integer lock = new Integer(10); ○

private final String lock = new String("LOCK").intern(); ×

private final String lock = "LOCK"; ×

private final String lock = new String("LOCK"); ○

private final Object lock = new Object(); ◎

public void method() {

synchronized(lock){

...

}

}

方法2

private final Lock lock = new ReentrantLock();

public void method() {

lock.lock();

try{

...

}finally{

lock.unlock();

}

}

Class level locking

make static data thread safe

public synchronized static void syncMethod(){...}

public void syncMethod(){

synchronized (SyncClass.class) {...}

}

private final static Object lock = new Object();

public void syncMethod(){

synchronized (lock) {...}

}

※共有staticデータの保護にstaticロックオブジェクトを使う

理由:複数のスレッドが開始すると、ロックオブジェクトのインスタンスも複数生成される

class MyThread implements Runnable {

private static int counter;

private static final Object lock = new Object();

public void run() {

synchronized(lock){

counter++;

...

}

}

}

★lock vs synchronized

Synchronized = Lock + Condition

lock

競争がある際、待つ状態

スレッドに対し、公平・非公平を設定できる

コードレベル

自由、機能強

ReadWriteLockを使うと、もっと細かく操作できる

synchronized

競争がある際、塞がる状態

スレッドに対し、非公平のみ

JVM(Runtime)レベル

便利、安全

★volatileについて

volatileをつけないと、あるスレッドに変数が変わっても、別スレッドにすぐ反映されるのを保証できない。

volatileはスレッドセーフを保証できず、主メモリから最新状態を取得するだけ。

1.State flag

public class StopThread {

private static volatile boolean stopRequested = false;

public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(new Runnable() {

@Override

public void run() {

while(!stopRequested) {...};

}

});

thread.start();

TimeUnit.SECONDS.sleep(1);

stopRequested = true;

}

}

2.簡易のカウンター

private volatile int count = 0;

public synchronized void increment() {

count++;

}

public int getCount() {

return count;

}

注意:++処理は原子操作ではないため、マルチスレッド環境に重複の値が生成されるかも。

改善版(synchronizedより性能がいい)

private AtomicInteger count = new AtomicInteger();

public void increment() {

count.incrementAndGet(); //getAndIncrement()もある

}

public int getCount() {

return count.get();

}

3.その他

private volatile IoBufferAllocator instance;

public IoBufferAllocator getInsntace(){

if(instance==null){

synchronized (IoBufferAllocator.class) {

if(instance==null)

instance=new IoBufferAllocator();

}

}

return instance;

}

★ThreadLocalについて

1スレッドに対して1オブジェクトだけを持つ仕組

public class Test {

private static ThreadLocal<Integer> tl = new ThreadLocal<Integer>();

public static void main(String[] args) {

for(int i = 0; i < 2; i++){

new Thread( new Runnable(){

@Override

public void run() {

int data = new Random().nextInt();

System.out.println(Thread.currentThread().getName() + " set:" + data);

// 実行中のスレッドにデータを入れる

tl.set(data); // 簡単な対象

Target target = Target.getThreadInstance(); // 複雑な対象

target.setData(data);

get();

}

}).start();

}

}

public static void get(){

int data = tl.get();

System.out.println(Thread.currentThread().getName() + " get:" + data);

Target target = Target.getThreadInstance();

System.out.println(Thread.currentThread().getName() + " getData : " + target.getData());

}

}

class Target {

private Target(){}

// synchronizedが必要なし

public static /*synchronized*/ Target getThreadInstance(){

Target instance = map.get();

if(instance == null){

instance = new Target();

map.set(instance);

}

return instance;

}

private static ThreadLocal<Target> map = new ThreadLocal<Target>();

private int data;

// getData/setData

}

利用例1

private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<>() {

public Connection initialValue() {

return DriverManager.getConnection(DB_URL);

}

};

public static Connection getConnection() {

return connectionHolder.get();

}

利用例2

private static final ThreadLocal<Session> threadSession = new ThreadLocal<>();

public static Session getSession() throws InfrastructureException {

Session session = threadSession.get();

try {

if (session == null) {

session = getSessionFactory().openSession();

threadSession.set(session);

}

} catch (HibernateException ex) {

throw new InfrastructureException(ex);

}

return session;

}

synchronized

private Object notEmpty = new Object();

private Object notFull = new Object();

private Queue<Object> linkedList = new LinkedList<Object>();

private int maxLength = 10;

public Object take() throws InterruptedException {

synchronized (notEmpty) {

if (linkedList.size() == 0) {

notEmpty.wait();

}

synchronized (notFull) {

if (linkedList.size() == maxLength) {

notFull.notifyAll();

}

return linkedList.poll();

}

}

}

public void offer(Object object) throws InterruptedException {

synchronized (notEmpty) {

if (linkedList.size() == 0) {

notEmpty.notifyAll();

}

synchronized (notFull) {

if (linkedList.size() == maxLength) {

notFull.wait();

}

linkedList.add(object);

}

}

}

lock

private Lock lock = new ReentrantLock();

private Condition notEmpty = lock.newCondition();

private Condition notFull = lock.newCondition();

private Queue<Object> linkedList = new LinkedList<Object>();

private int maxLength = 10;

public Object take() throws InterruptedException {

lock.lock();

try {

if (linkedList.size() == 0) {

notEmpty.await();

}

if (linkedList.size() == maxLength) {

notFull.signalAll();

}

return linkedList.poll();

} finally {

lock.unlock();

}

}

public void offer(Object object) throws InterruptedException {

lock.lock();

try {

if (linkedList.size() == 0) {

notEmpty.signalAll();

}

if (linkedList.size() == maxLength) {

notFull.await();

}

linkedList.add(object);

} finally {

lock.unlock();

}

}

wait / notify

private final Object lock = new Object();

private volatile boolean flag = false;

public void waitTillChange() {

synchronized(lock) {

while(!flag)

try {

lock.wait();

} catch(InterruptedException e) {...}

}

}

public void change() {

synchronized(lock) {

flag = true;

lock.notifyAll();

}

}

Condition

private final Lock lock = new ReentrantLock();

private final Condition condition = lock.newCondition();

private volatile boolean flag = false;

public void waitTillChange() {

lock.lock();

try {

while(!flag) {

condition.await();

}

} finally {

lock.unlock();

}

}

public void change() {

lock.lock();

try {

flag = true;

condition.signalAll();

} finally {

lock.unlock();

}

}

★Lock-Free = loop + CAS + break

ロックを避けるため、CASを持つデータ構造を利用すべき

・AtomicXXX、ConcurrentMap、CopyOnWriteList、ConcurrentLinkedQueueなど

private volatile int max = 0;

public synchronized void set(int value) {

if (value > max) {

max = value;

}

}

public int getMax() {

return max;

}

private AtomicInteger max = new AtomicInteger();

// 書き方1

public void set(int value) {

for (;;) {

int current = max.get();

if (value > current) {

if (max.compareAndSet(current, value)) {

break;

} else {

continue;

}

} else {

break;

}

}

}

// 書き方2

public void set(int value) {

int current;

do {

current = max.get();

if (value <= current) {

break;

}

} while (!max.compareAndSet(current, value));

}

public int getMax() {

return max.get();

}

★Deadlock avoidance

1.リソース共用を避ける

2.Spin Lock

try{

if(lock.tryLock(2, TimeUnit.SECONDS)){

// do something

}

}catch(InterruptedException e){

}finally{

lock.unlock();

}

3.JDK 7のPhaser

4.Lock splitting

5.Lock ordering

6.Lock timeout

★各種ロックの性能テスト

※テスト結果は添付ファイルを参照

CODE

利用例3

public class Task implements Runnable {

private static final AtomicInteger nextId = new AtomicInteger(0);

private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {

@Override

protected Integer initialValue() {

return nextId.getAndIncrement();

}

};

public int getThreadId() {

return threadId.get();

}

private static final ThreadLocal<Date> startDate = new ThreadLocal<Date>() {

protected Date initialValue() {

return new Date();

}

};

@Override

public void run() {

out.printf("Thread started: %s : %s\n", getThreadId(), startDate.get());

// ...

out.printf("Thread ended: %s : %s\n", getThreadId(), startDate.get());

}

}