17.1 シグナルとモニター (Signals and Monitors)

17.1 シグナルとモニター (Signals and Monitors)

Example 17.1.1 モニター は Scala におけるプロセスの相互排他処理の基本的な手段を提供します。 AnyRef クラスの各インスタンスは、次の1つあるいは複数のメソッドを呼ぶことでモニターとして使えます。

def synchronized[A] (e: => A): A def wait() def wait(msec: Long) def notify() def notifyAll()

同期メソッドは相互排他的に、つまり同時にはただ1つのスレッドだけが与えられたモニターの同期化引数を実行できる方式で、その引数である計算 e を実行します。

スレッドはシグナルをウェイトすることで、モニター内でサスペンドできます。 wait メソッドを呼び出すスレッドは、同じオブジェクトの notify メソッドが他のスレッドからその後呼ばれるまで、ウェイトします。

ウェイトの制限時間つき形態もあり、シグナルを受信するか、あるいは指定された時間(ミリ秒で与えられる)が過ぎるまでブロックします。さらにまた、シグ ナルを待つ全てのスレッドをアンブロックする notifyAll メソッドもあります。これらのメソッドは Monitor クラスと同様に、Scala ではプリミティブです。つまり、それらは実行時システムによって実装されています。

典型的には、スレッドはある条件が確立するまでウェイトします。もしその条件が wait をコールするまでに確立していなければ、そのスレッドは、他のスレッドがその条件を確立するまで、ブロックします。notify あるいは notifyAll を発行してウェイトしているプロセスを起動するのは、他のスレッドの責任です。しかし、ウェイトしているプロセスが notify 発行後すぐに実行されるという保証はありません。他のプロセスが最初に実行され、その条件を無効にすることもあり得ます。ですから、条件 C の確立をウェイとする正しい形は、while ループを使うことです。

while (!C ) wait()

モニターの使用例として、境界付きバッファの実装をあげておきます。

class BoundedBuffer[A](N: Int) { var in = 0, out = 0, n = 0 val elems = new Array[A](N) def put(x: A) = synchronized { while (n >= N) wait() elems(in) = x ; in = (in + 1) % N ; n = n + 1 if (n == 1) notifyAll() } def get: A = synchronized { while (n == 0) wait() val x = elems(out) ; out = (out + 1) % N ; n = n - 1 if (n == N - 1) notifyAll() x } }

境界付きバッファを使って生産プロセスと消費プロセスとの間でコミュニケーションをとるプログラムを見てみましょう。

import scala.concurrent.ops._ ... val buf = new BoundedBuffer[String](10) spawn { while (true) { val s = produceString ; buf.put(s) } } spawn { while (true) { val s = buf.get ; consumeString(s) } } }

spawn メソッドは、パラメータで与えられた式を実行する新しいスレッドを生成します。それはオブジェクト scala.concurrent.ops において、次のように定義されています。

def spawn(p: => Unit) { val t = new Thread() { override def run() = p } t.start() }