スレッドプール

スレッドプールの構成

・Worker RunnableかWaitingか

・Task CallableかRunnableか

・Work Queue BlockingQueueインタフェースの実装クラス

一般のスレッド状態 New, Runnable, Blocked, Waiting, Terminated

スレッドプール中の状態 Runnable, Waiting

CPUの数:int N_CPU = Runtime.getRuntime().availableProcessors();

スレッド数

式1:N + 1

式2:N * U * (1 + W/C)

※N:CPUの数、U:CPU稼働率(0<U<1)、W/C:待ち時間と計算時間の比率

一般サンプル

ExecutorService es = Executors.newFixedThreadPool(2);

for (int i = 0; i < 5; i++) {

es.submit(new Runnable() { // es.execute()でもよい

@Override

public void run() {

System.out.println(Thread.currentThread().getName());

// do something

}

});

}

es.shutdown();

・ExecutorServiceのFuture<?> submit(Runnable task); 戻り値あり

・Executorのvoid execute(Runnable command); 戻り値なし

ExecutorService

Future<?> submit(Runnable task)

Future<T> submit(Callable<T> task)

Future<T> submit(Runnable task, T result)

List<Future<T> invokeAll(Collection<? extends Callable<T>> tasks)

List<Future<T> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

T invokeAny(Collection<? extends Callable<T>> tasks)

T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

Java 7以前

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 5; i++) {

executor.submit(() -> out.println(Thread.currentThread()));

}

executor.shutdown();

executor.awaitTermination(1, TimeUnit.SECONDS);

Java 8以降

doParallel(5, () -> out.println(Thread.currentThread()));

private void doParallel(int times, Runnable task) {

IntStream.range(0, times).parallel().forEach(i -> task.run());

}

スレッドプールの種類

〇newSingleThreadExecutor 一つのスレッドでタスクの処理を行う

定義:

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

〇newCachedThreadPool 必要に応じて自動的にスレッドを作成し、タスクの処理を行う

定義:

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>());

}

〇newFixedThreadPool 指定した数のスレッドを作成し、タスクの処理を行う

定義:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

〇newSingleThreadScheduledExecutor

〇newScheduledThreadPool

Executors.newScheduledThreadPool(2).scheduleAtFixedRate(

new Runnable() {

@Override

public void run() {

System.out.println("AAA");

}

},

5, // 初回は5秒後に実行

2, // 以降は2秒おきに実行

TimeUnit.SECONDS); // 単位

ちなみに、コアThreadPoolExecutorの定義:

public ThreadPoolExecutor(

int corePoolSize, 最小スレッド数

int maximumPoolSize, 最大スレッド数

long keepAliveTime, スレッドのライフ時間

TimeUnit unit, 時間単位

BlockingQueue<Runnable> workQueue, タスクキュー

ThreadFactory threadFactory, スレッドの起動

RejectedExecutionHandler handler) { 最大スレッド数を超えたタスクに対して

...

}

Executorsの代わりにThreadPoolExecutorを使うべき

private static ThreadFactory threadFactory =

new ThreadFactoryBuilder()

.setNameFormat("myname" + "-%d")

.setDaemon(true)

.build();

// FixedThreadPool

public static ExecutorService createFixedThreadPool() {

return new ThreadPoolExecutor(

5,

10,

0L,

TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(queueSize),

threadFactory,

new ThreadPoolExecutor.AbortPolicy());

}

// CacheThreadPool

public static ExecutorService createCacheThreadPool(){

return new ThreadPoolExecutor(

10,

20,

10L,

TimeUnit.SECONDS,

new SynchronousQueue<Runnable>(),

threadFactory,

new ThreadPoolExecutor.AbortPolicy());

}

// ScheduledThreadPool

private static CountDownLatch latch = new CountDownLatch(1);

Runnable task = () -> out.println(Thread.currentThread().getName() + "executing");

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);

executorService.scheduleAtFixedRate(task, 0L, 5L, TimeUnit.SECONDS);

latch.await();