17.10 メールボックス

メールボックスはプロセス同期と通信のための、高度で柔軟な構造物です。メッセージの送受信が可能です。ここで メッセージ とは任意のオブジェクトを指します。シグナルのタイムアウトに使う TIMEOUT という特別のメッセージがあります。

case object TIMEOUT

メールボックスは次のシグネチャを実装します。

class MailBox { def send(msg: Any) def receive[A](f: PartialFunction[Any, A]): A def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A }

メールボックスの状態はメッセージのマルチセットから成ります。メッセージは send メソッドで メールボックスへ追加されます。メッセージは receive メソッドで メールボックスから取り除かれ、メッセージプロセッサ f の引数に渡されます。 f はメッセージから何らかの結果型への部分関数です。通常、この関数はパターンマッチ式で実装されます。 receive メソッドは、そのメッセージプロセッサが定義されたメールボックスにメッセージが届くまで、ブロックされます。マッチしたメッセージはメールボックスから 取り除かれ、ブロックされていたスレッドは再スタートしてメッセージプロセッサをそのメッセージに適用します。送られたメッセージとレシーバの双方とも時 間順に並べられます。レシーバ r は、マッチしたメッセージ m へ適用されますが、それは、各コンポーネントを時間順に並べた個別の順序中で、m, r に先立つ {メッセージ,レシーバ}ペアが他にないときに限ります。

メールボックスの使い方の簡単な例として one-place バッファを考えてみましょう。

class OnePlaceBuffer { private val m = new MailBox // An internal mailbox private case class Empty, Full(x: Int) // Types of messages we deal with m send Empty // Initialization def write(x: Int) { m receive { case Empty => m send Full(x) } } def read: Int = m receive { case Full(x) => m send Empty; x } }

メールボックスクラスは、次のようにも実装できます。

class MailBox { private abstract class Receiver extends Signal { def isDefined(msg: Any): Boolean var msg = null }

テストメソッド isDefined を備えたレシーバ用の内部クラスを定義し、与えられたメッセージに対してレシーバが定義されているかどうかを示すようにします。レシーバは Signal クラスから、レシーバスレッドを起動するのに使われる nofify メソッドを継承します。レシーバスレッドが起動されると、適用すべきメッセージは Reciever の msg 変数に保存されます。

private val sent = new LinkedList[Any] private var lastSent = sent private val receivers = new LinkedList[Receiver] private var lastReceiver = receivers

メールボックスクラスは2つの連結リストを保持していて、一つは、送信されたけれど取り出されていないメッセージ用で、もう一つは、ウェイトしているレシーバ用のものです。

def send(msg: Any) = synchronized { var r = receivers, r1 = r.next while (r1 != null && !r1.elem.isDefined(msg)) { r = r1; r1 = r1.next } if (r1 != null) { r.next = r1.next; r1.elem.msg = msg; r1.elem.notify } else { lastSent = insert(lastSent, msg) } }

send メソッドは最初に、ウェイトしているレシーバがその送信されたメッセージに適用可能かどうかチェックします。もしそうなら、レシーバに通知されます。そうでなければ、メッセージは送信されたメッセージの連結リストに追加されます。

def receive[A](f: PartialFunction[Any, A]): A = { val msg: Any = synchronized { var s = sent, s1 = s.next while (s1 != null && !f.isDefinedAt(s1.elem)) { s = s1; s1 = s1.next } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = insert(lastReceiver, new Receiver { def isDefined(msg: Any) = f.isDefinedAt(msg) }) lastReceiver = r r.elem.wait() r.elem.msg } } f(msg) }

recieve メソッドは最初に、メッセージプロセッサ関数 f が、既に送信されたけれどもまだ取り出されていないメッセージに適用可能かどうかチェックします。もしそうなら、スレッドは続けてすぐに f をそのメッセージに適用します。そうでなければ、新しいレシーバが作られてレシーバリストへリンクされ、スレッドはそのレシーバ上の通知を待ちます。ス レッドは再び起動されると、f をそのレシーバに保存されたメッセージに適用します。 連結リストの insert メソッドは次のように定義されています。

def insert(l: LinkedList[A], x: A): LinkedList[A] = { l.next = new LinkedList[A] l.next.elem = x l.next.next = l.next l }

メールボックスクラスは、指定された最大時間だけブロックする receiveWithin メソッドも提供しています。メッセージを指定された時間(ミリ秒で与えられる)以内に受信しなければ、メッセージプロセッサ引数 f は TIMEOUT という特別のメッセージでアンブロックされます。 recieveWithin の実装は receive とほとんど同じです。

def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A = { val msg: Any = synchronized { var s = sent, s1 = s.next while (s1 != null && !f.isDefinedAt(s1.elem)) { s = s1; s1 = s1.next } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = insert(lastReceiver, new Receiver { def isDefined(msg: Any) = f.isDefinedAt(msg) }) lastReceiver = r r.elem.wait(msec) if (r.elem.msg == null) r.elem.msg = TIMEOUT r.elem.msg } } f(msg) } } // end MailBox

違いは、制限時間つきの wait コールと、その後の文だけです。