QueuesとDeques

Queues

BlockingQueue (Interface)

PriorityQueue

ArrayBlockingQueue 配列

LinkedBlockingQueue リスト FIFO Blocking

ConcurrentLinkedQueue リスト FIFO

PriorityBlockingQueue

DelayQueue

SynchronousQueue

QueueとBlockingQueue

Queueインターフェース(ブロックメソッドなし)

boolean offer(E e) キューに挿入

element() キューの先頭を取得。キューがnullの場合、例外発生

peek() キューの先頭を取得。キューがnullの場合、nullを返す

remove() キューの先頭を削除。キューがnullの場合、例外発生

poll() キューの先頭を削除。キューがnullの場合、nullを返す

BlockingQueueインターフェース(ブロックメソッドあり)

boolean offer(E e, long timeout, TimeUnit unit) 挿入。キューが使用できない場合は指定時間で待機

E poll(long timeout, TimeUnit unit) キューの先頭を削除。キューが空の場合は指定時間で待機

put(E) キューに追加。空間が利用可能になるまで待機

E take() キューの先頭を削除。キューが空の場合は待機

remainingCapacity()

drainTo(Collection<? super E>) : int

drainTo(Collection<? super E>, int ) : int

BlockingQueue実装

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue sizeは0

PriorityBlockingQueue

TransferQueue JDK 7にSynchronousQueueより速い

その他

CompletionService BlockingQueue + Executor

public class Producer extends Thread{

private final BlockingQueue<String> queue;

private final Map<String, Integer> calendarNames;

public Producer(BlockingQueue<String> queue) {

this.queue = queue;

Calendar now = Calendar.getInstance();

Locale locale = Locale.getDefault();

this.calendarNames =

now.getDisplayNames(Calendar.MONTH, Calendar.ALL_STYLES, locale);

System.out.println(this.calendarNames);

}

@Override

public void run() {

try {

for (String key : this.calendarNames.keySet()) {

this.queue.put(key);

}

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(this.queue);

}

}

public class Consumer extends Thread{

private final BlockingQueue<String> queue;

public Consumer(String name, BlockingQueue<String> queue) {

this.queue = queue;

super.setName(name);

}

@Override

public void run() {

String value;

try {

while(true){

value = queue.poll(30, TimeUnit.MILLISECONDS);

System.out.println(super.getName() + " Poll: " + value);

if(value == null){

return;

}

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

// 利用側

BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

Producer producer = new Producer(queue);

Consumer consumer1 = new Consumer("Consumer1", queue);

Consumer consumer2 = new Consumer("Consumer2", queue);

producer.start();

consumer1.start();

consumer2.start();

try {

producer.join();

consumer1.join();

consumer2.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

★UnblockingとBlocking

Unblocking:容量は拡張できる、データを格納するのは目的

List<String> list = new ArrayList<String>(5);

for(int i = 0; i < 10; i++){ ←初期容量を超えてもOK

list.add("a");

}

Blocking:容量は固定(指定しない場合はIntegerの最大値)、マルチスレッドを格納するのは目的

BlockingQueue<String> bq = new ArrayBlockingQueue<String>(5);

for(int i = 0; i < 10; i++){ ←初期容量を超えたらエラー発生

bq.add("a");

}

Deques

BlockingDeque (Interface)

LinkedList

ArrayDeque 配列

LinkedBlockingDeque リスト FIFO/FILO Blocking

ConcurrentLinkedDeque リスト FIFO/FILO

DequeとBlockingDeque

Dequeインターフェース

先頭メソッド

addFirst(E e) キューに挿入。キューがnullの場合、例外発生

removeFirst() キューの先頭を削除。キューがnullの場合、例外発生

getFirst() キューの先頭を取得。キューがnullの場合、例外発生

offerFirst(E e) キューに挿入。キューがnullの場合、nullを返す

pollFirst() キューの先頭を削除。キューがnullの場合、nullを返す

peekFirst() キューの先頭を取得。キューがnullの場合、nullを返す

※末尾も同じ。XXXLast

そのほかに、Stack-likeメソッド

push(E e) = addFirst(E e)

pop() = removeFirst()

peek() = peekFirst()

BlockingDequeインターフェース

先頭メソッド

putFirst(E e, long timeout, TimeUnit unit) キューに挿入。キューが使用できない場合は指定時間で待機

takeFirst(E e, long timeout, TimeUnit unit) キューの先頭を削除。キューが空の場合は指定時間で待機

putFirst(E e) キューに追加。空間が利用可能になるまで待機

takeFirst() キューの先頭を削除。キューが空の場合は待機

※末尾も同じ。XXXLast

public class Producer extends Thread{

private final BlockingDeque<String> deque;

private final Map<String, Integer> calendarNames;

public Producer(BlockingDeque<String> deque) {

this.deque = deque;

Calendar now = Calendar.getInstance();

Locale locale = Locale.getDefault();

this.calendarNames =

now.getDisplayNames(Calendar.MONTH, Calendar.ALL_STYLES, locale);

System.out.println(this.calendarNames);

}

@Override

public void run() {

for (String key : this.calendarNames.keySet()) {

if(this.calendarNames.size() % 2 == 1){

this.deque.addFirst(key);

}else{

this.deque.addLast(key);

}

}

System.out.println(this.deque);

}

}

public class Consumer extends Thread{

private final BlockingDeque<String> deque;

public Consumer(String name, BlockingDeque<String> deque) {

this.deque = deque;

super.setName(name);

}

@Override

public void run() {

String value;

int counter = 1;

try {

while(true){

if(counter % 2 == 1){

value = deque.pollLast(30, TimeUnit.MILLISECONDS);

System.out.println(super.getName() + " PollLast: " + value);

}else{

value = deque.pollFirst(30, TimeUnit.MILLISECONDS);

System.out.println(super.getName() + " PollFirst: " + value);

}

counter++;

if(value == null){

return;

}

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

// 利用側

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

Producer producer = new Producer(deque);

Consumer consumer1 = new Consumer("Consumer1", deque);

Consumer consumer2 = new Consumer("Consumer2", deque);

producer.start();

consumer1.start();

consumer2.start();

try {

producer.join();

consumer1.join();

consumer2.join();

} catch (InterruptedException e) {

e.printStackTrace();

}