17.9 ワーカー

Scala における 計算サーバー の実装例を見てみましょう。サーバーは future メソッドを実装し、与えられた式をその呼び出し者と並列に評価します。 17.3節の実装とは異なり、サーバーはスレッドの事前定義された数字だけを持って future を計算します。サーバーの一つのあり得る実装では、別々のプロセッサ上で各スレッドを走らせることができ、シングルプロセッサ上で複数のスレッドを切り替 える際のコンテキストスイッチングに伴うオーバーヘッドを回避できます。

import scala.concurrent._, scala.concurrent.ops._ class ComputeServer(n: Int) { private abstract class Job { type T def task: T def ret(x: T) } private val openJobs = new Channel[Job]() private def processor(i: Int) { while (true) { val job = openJobs.read job.ret(job.task) } } def future[A](p: => A): () => A = { val reply = new SyncVar[A]() openJobs.write{ new Job { type T = A def task = p def ret(x: A) = reply.set(x) } } () => reply.get } spawn(replicate(0, n) { processor }) }

計算すべき式(つまり、future を呼び出すときの引数)は openJobs チャネルに書かれます。 ジョブ (job) は次を備えた オブジェクトです。

    • job の計算結果を表す抽象型 T

    • 計算すべき式を表す、パラメータのない、型 T の taskメソッド

    • いちど計算された結果をとる ret メソッド

計算サーバーはその初期化中に n 個の processor プロセスを生成します。各プロセスは未処理の jobを繰り返し取り出し、job の task メソッドを評価して job の ret メソッドに結果を渡します。多相的な future メソッドは、reply という名前のガードを使って ret メソッドを実装している新しい job を生成し、その job を未処理 job の集合に入れます。そして、対応する reply ガードが呼ばれるまでウェイトします。

この例は抽象型の使い方を示しています。抽象型 T は、job の結果型を記録し、異なる job に対しては異なる型をとることもできます。抽象型がなければ、ユーザは型キャストと動的な型テストへの信頼なしに、静的に型安全な方法で同じクラスを作る ことはできないでしょう。次に、式 41 + 1 を評価する計算サーバーを使うコードを示します。

object Test with Executable { val server = new ComputeServer(1) val f = server.future(41 + 1) println(f()) }