QueuesとDeques
Queues
BlockingQueue (Interface)
PriorityQueue
ArrayBlockingQueue 配列
LinkedBlockingQueue リスト FIFO Blocking
ConcurrentLinkedQueue リスト FIFO
PriorityBlockingQueue
DelayQueue
SynchronousQueue
QueueとBlockingQueue
Queueインターフェース(ブロックメソッドなし)
boolean offer(E e) キューに挿入
element() キューの先頭を取得。キューがnullの場合、例外発生
peek() キューの先頭を取得。キューがnullの場合、nullを返す
remove() キューの先頭を削除。キューがnullの場合、例外発生
poll() キューの先頭を削除。キューがnullの場合、nullを返す
BlockingQueueインターフェース(ブロックメソッドあり)
boolean offer(E e, long timeout, TimeUnit unit) 挿入。キューが使用できない場合は指定時間で待機
E poll(long timeout, TimeUnit unit) キューの先頭を削除。キューが空の場合は指定時間で待機
put(E) キューに追加。空間が利用可能になるまで待機
E take() キューの先頭を削除。キューが空の場合は待機
remainingCapacity()
drainTo(Collection<? super E>) : int
drainTo(Collection<? super E>, int ) : int
BlockingQueue実装
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue sizeは0
PriorityBlockingQueue
TransferQueue JDK 7にSynchronousQueueより速い
その他
CompletionService BlockingQueue + Executor
public class Producer extends Thread{
private final BlockingQueue<String> queue;
private final Map<String, Integer> calendarNames;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
Calendar now = Calendar.getInstance();
Locale locale = Locale.getDefault();
this.calendarNames =
now.getDisplayNames(Calendar.MONTH, Calendar.ALL_STYLES, locale);
System.out.println(this.calendarNames);
}
@Override
public void run() {
try {
for (String key : this.calendarNames.keySet()) {
this.queue.put(key);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.queue);
}
}
public class Consumer extends Thread{
private final BlockingQueue<String> queue;
public Consumer(String name, BlockingQueue<String> queue) {
this.queue = queue;
super.setName(name);
}
@Override
public void run() {
String value;
try {
while(true){
value = queue.poll(30, TimeUnit.MILLISECONDS);
System.out.println(super.getName() + " Poll: " + value);
if(value == null){
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 利用側
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
Producer producer = new Producer(queue);
Consumer consumer1 = new Consumer("Consumer1", queue);
Consumer consumer2 = new Consumer("Consumer2", queue);
producer.start();
consumer1.start();
consumer2.start();
try {
producer.join();
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
★UnblockingとBlocking
Unblocking:容量は拡張できる、データを格納するのは目的
List<String> list = new ArrayList<String>(5);
for(int i = 0; i < 10; i++){ ←初期容量を超えてもOK
list.add("a");
}
Blocking:容量は固定(指定しない場合はIntegerの最大値)、マルチスレッドを格納するのは目的
BlockingQueue<String> bq = new ArrayBlockingQueue<String>(5);
for(int i = 0; i < 10; i++){ ←初期容量を超えたらエラー発生
bq.add("a");
}
Deques
BlockingDeque (Interface)
LinkedList
ArrayDeque 配列
LinkedBlockingDeque リスト FIFO/FILO Blocking
ConcurrentLinkedDeque リスト FIFO/FILO
DequeとBlockingDeque
Dequeインターフェース
先頭メソッド
addFirst(E e) キューに挿入。キューがnullの場合、例外発生
removeFirst() キューの先頭を削除。キューがnullの場合、例外発生
getFirst() キューの先頭を取得。キューがnullの場合、例外発生
offerFirst(E e) キューに挿入。キューがnullの場合、nullを返す
pollFirst() キューの先頭を削除。キューがnullの場合、nullを返す
peekFirst() キューの先頭を取得。キューがnullの場合、nullを返す
※末尾も同じ。XXXLast
そのほかに、Stack-likeメソッド
push(E e) = addFirst(E e)
pop() = removeFirst()
peek() = peekFirst()
BlockingDequeインターフェース
先頭メソッド
putFirst(E e, long timeout, TimeUnit unit) キューに挿入。キューが使用できない場合は指定時間で待機
takeFirst(E e, long timeout, TimeUnit unit) キューの先頭を削除。キューが空の場合は指定時間で待機
putFirst(E e) キューに追加。空間が利用可能になるまで待機
takeFirst() キューの先頭を削除。キューが空の場合は待機
※末尾も同じ。XXXLast
public class Producer extends Thread{
private final BlockingDeque<String> deque;
private final Map<String, Integer> calendarNames;
public Producer(BlockingDeque<String> deque) {
this.deque = deque;
Calendar now = Calendar.getInstance();
Locale locale = Locale.getDefault();
this.calendarNames =
now.getDisplayNames(Calendar.MONTH, Calendar.ALL_STYLES, locale);
System.out.println(this.calendarNames);
}
@Override
public void run() {
for (String key : this.calendarNames.keySet()) {
if(this.calendarNames.size() % 2 == 1){
this.deque.addFirst(key);
}else{
this.deque.addLast(key);
}
}
System.out.println(this.deque);
}
}
public class Consumer extends Thread{
private final BlockingDeque<String> deque;
public Consumer(String name, BlockingDeque<String> deque) {
this.deque = deque;
super.setName(name);
}
@Override
public void run() {
String value;
int counter = 1;
try {
while(true){
if(counter % 2 == 1){
value = deque.pollLast(30, TimeUnit.MILLISECONDS);
System.out.println(super.getName() + " PollLast: " + value);
}else{
value = deque.pollFirst(30, TimeUnit.MILLISECONDS);
System.out.println(super.getName() + " PollFirst: " + value);
}
counter++;
if(value == null){
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 利用側
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
Producer producer = new Producer(deque);
Consumer consumer1 = new Consumer("Consumer1", deque);
Consumer consumer2 = new Consumer("Consumer2", deque);
producer.start();
consumer1.start();
consumer2.start();
try {
producer.join();
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}