2009年9月27日日曜日

Scala で待ち行列

しばらく Scala を触っていなかったので思い出す目的と、Actor を試してみる目的で、以前Erlang で書いた待ち行列のシミュレーションを Scalaで書き換えてみる事にした。
package exercise.actors

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.Queue;
import scala.actors.TIMEOUT  

case class Done(diff:Long)

object ExpRandom {
 def rand(m:Double):Double = {
      -m * Math.log(Math.random)
 }
}

class Service(observer: Observer) extends Actor {
  val μ = 0.5 //平均サービス率: 平均サービス時間2秒の逆数
  val queue = new Queue[Long]
  def serviceTime(): Double = {
    ExpRandom.rand(1.0 / μ)
  }
  val worker: Actor = new Actor {
    def act() {
      loop {
        receive {
       case 'serve =>
         Thread.sleep(Math.round(serviceTime() * 1000))
         sender ! 'afterProcessed
     }
      }
    }
  }
  worker.start
  def act() {
    var inService: Boolean = false
    loop {
      receive {
        case 'customerIn =>
          queue.enqueue(System.currentTimeMillis())
          Console.println("queue.length=" + queue.length)
          self! 'doService
        case 'afterProcessed =>
          inService = false
          queue.dequeue
          self! 'doService
        case 'doService =>
          if (!inService) inService = doService()
      }
    }
  }
  def doService(): Boolean = {
 if (queue.isEmpty) false
    else {
      val elapsed = System.currentTimeMillis() - queue.front
      observer! Done(elapsed)
      worker! 'serve
      true
 }
  }
}

class Customer(service: Service) extends Actor {
  val λ = 0.4 // 毎秒0.4個のリクエストが到着
  var interval: Double = 0

  def arrivalInterval(): Double = {
 ExpRandom.rand(1.0 / λ)
  }

  def act() {
    loop {
      receiveWithin(Math.round(interval * 1000L)) {
        case TIMEOUT =>
          service! 'customerIn
          interval = arrivalInterval()
      }
    }
  }
}

class Observer extends Actor {
  var requestCount:Int = 0
  var totalWaitingTime:Double = 0
  def act() {
    loop {
   receive {
     case Done(time)=>
          totalWaitingTime += time
          requestCount += 1
       Console.printf(
         "リクエスト%d; 待ち時間:%dms; 平均待ち時間=%dms;%n", 
         requestCount, 
         time, 
            Math.round(totalWaitingTime / requestCount))
       }
    }
  }
}

object QueueTheoryTest extends Application {
  val observer = new Observer
  val service = new Service(observer)
  val customer = new Customer(service)
  
  observer.start
  service.start
  customer.start
}
(シンボルを表すアポストロフィが文字列をくくるクオーテーションマークに認識されているらしく、Prettify が上手く動かない・・・) actor customerから平均到着率(λ)0.4個/秒でリクエストが届き、actor service が平均サービス率(μ)0.5個/秒でリクエストを処理する設定にしてみた。ρ=0.4/0.5、Ts=1/0.5 なので、平均待ち時間は Tw = ρ / (1 - ρ) * Ts = 8秒 = 8000ms となることが期待されるが、以下のような結果になった。
リクエスト1998; 待ち時間:19875ms; 平均待ち時間=8527ms;
queue.length=10
リクエスト1999; 待ち時間:25484ms; 平均待ち時間=8535ms;
リクエスト2000; 待ち時間:25047ms; 平均待ち時間=8544ms;
リクエスト2001; 待ち時間:25266ms; 平均待ち時間=8552ms;
queue.length=8
リクエスト2002; 待ち時間:22531ms; 平均待ち時間=8559ms;
なんか微妙。期待値8000msより結構大きい値になった。リクエスト2000個でもなかなか予想する値に落ち着かない。λ=2、μ=2.5にして回転率を上げてみるとリクエスト5000個で期待値1600ms に対して 1455ms と少し小さめの数値になった。なんだろう?もっと待てば予想する値に近づくのか、Java の乱数の分布やJava/Scalaのリアルタイム処理のクセなのか、あるいはコードに間違いがあるのか良くわからない。ただ今回は Actor の使い方がざっくりと解ればいいので、余り深く突っ込むのは止めておいた。

Erlang から Scala に移植するに当たって、erlang:send_after に当たるものが、どうも Scalaには見当たらなかったので、もう一つの Actor を使ってThread.sleepさせてメッセージ送信を遅延させるコードを書いた。「actor {}」ではなくて「new Actor」を使っているが、前者だと上手く動かず、調べておきたいところだが今回は時間切れ。あと react を使ってメッセージをやりとりしているときにThread.sleep()すると、他の Actorまで止まってしまうのだろうと予想していたが、この実験ではreceive でも react でもあまりはっきりした違いは見られなかった。後でちゃんと調べておこう。

まあ、せっかく勉強し始めた Scala の忘却防止と、少量の知識追加としては、今回はこんなもんだろう。あと Scala に移植しているうちに Erlang側コードに間違いやらまずいコードがあったのに気付いたので、これも後で直しておく。

0 件のコメント:

コメントを投稿