スレッドプール
スレッドプールの構成
・Worker RunnableかWaitingか
・Task CallableかRunnableか
・Work Queue BlockingQueueインタフェースの実装クラス
一般のスレッド状態 New, Runnable, Blocked, Waiting, Terminated
スレッドプール中の状態 Runnable, Waiting
CPUの数:int N_CPU = Runtime.getRuntime().availableProcessors();
スレッド数
式1:N + 1
式2:N * U * (1 + W/C)
※N:CPUの数、U:CPU稼働率(0<U<1)、W/C:待ち時間と計算時間の比率
一般サンプル
ExecutorService es = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
es.submit(new Runnable() { // es.execute()でもよい
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
// do something
}
});
}
es.shutdown();
・ExecutorServiceのFuture<?> submit(Runnable task); 戻り値あり
・Executorのvoid execute(Runnable command); 戻り値なし
ExecutorService
Future<?> submit(Runnable task)
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
List<Future<T> invokeAll(Collection<? extends Callable<T>> tasks)
List<Future<T> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
T invokeAny(Collection<? extends Callable<T>> tasks)
T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
Java 7以前
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executor.submit(() -> out.println(Thread.currentThread()));
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
Java 8以降
doParallel(5, () -> out.println(Thread.currentThread()));
private void doParallel(int times, Runnable task) {
IntStream.range(0, times).parallel().forEach(i -> task.run());
}
スレッドプールの種類
〇newSingleThreadExecutor 一つのスレッドでタスクの処理を行う
定義:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
〇newCachedThreadPool 必要に応じて自動的にスレッドを作成し、タスクの処理を行う
定義:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
〇newFixedThreadPool 指定した数のスレッドを作成し、タスクの処理を行う
定義:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
〇newSingleThreadScheduledExecutor
〇newScheduledThreadPool
Executors.newScheduledThreadPool(2).scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("AAA");
}
},
5, // 初回は5秒後に実行
2, // 以降は2秒おきに実行
TimeUnit.SECONDS); // 単位
ちなみに、コアThreadPoolExecutorの定義:
public ThreadPoolExecutor(
int corePoolSize, 最小スレッド数
int maximumPoolSize, 最大スレッド数
long keepAliveTime, スレッドのライフ時間
TimeUnit unit, 時間単位
BlockingQueue<Runnable> workQueue, タスクキュー
ThreadFactory threadFactory, スレッドの起動
RejectedExecutionHandler handler) { 最大スレッド数を超えたタスクに対して
...
}
Executorsの代わりにThreadPoolExecutorを使うべき
private static ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("myname" + "-%d")
.setDaemon(true)
.build();
// FixedThreadPool
public static ExecutorService createFixedThreadPool() {
return new ThreadPoolExecutor(
5,
10,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
// CacheThreadPool
public static ExecutorService createCacheThreadPool(){
return new ThreadPoolExecutor(
10,
20,
10L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
// ScheduledThreadPool
private static CountDownLatch latch = new CountDownLatch(1);
Runnable task = () -> out.println(Thread.currentThread().getName() + "executing");
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
executorService.scheduleAtFixedRate(task, 0L, 5L, TimeUnit.SECONDS);
latch.await();