2009年8月20日木曜日

Erlang で待ち行列

Erlang の勉強のために情報処理技術者試験なんかでお馴染みの待ち行列理論を実験してみる。

待ち行列理論によると、あるサービスへの要求が単位時間に平均λ回発生(ポアソン分布で)し、またそのサービスが単位時間に平均μ個の要求を処理するとすると、待ち時間は W =ρ / (1 - ρ) * Ta(ここでρ=λ / μ, Ta = 1 / μ)で得られると言う。

例えば1時間に平均 7.5人診る歯医者に患者が毎時平均 6人来診するとした場合、
ρ = 6 / 7.5 = 0.8,
Ta = 1 / μ = 8分,
W = 0.8 / (1 - 0.8) * 8分 = 32分
となり、平均32分の待ち時間が予想される。

これと同じ条件でErlang でシミュレートして、理論を検証してみようと思う。ただし時間は早送りでやってみる。すなわち1時間=60分を6秒に縮尺して試行する。

また、せっかくの Erlang なので、プロセスを活用してみたい。
以下の3つのプロセスでやってみる。
  • customerプロセス:ポアソン分布に従い、繰り返して要求を出す
  • serviceプロセス:一定の処理時間で順番に一個ずつ要求を処理する
  • observer プロセス:要求処理が開始される度に、その要求の待機時間を集計する
customerプロセスは以下のようなコード。要求発生回数をポアソン分布に従わせるかわりに、時間間隔を指数分布に従わせる。
-module(customer).
-export([start_customer/1,customer_loop/1]).

%customer のプロセスを生成し、初期化メッセージを送る
start_customer(Service) ->
  C = spawn(customer, customer_loop, [100]),
  C! {start, Service},
  C.

% customer のプロセス.
% startメッセージで与えられたPid に指数分布に従う時間間隔で
% メッセージを送り続ける
customer_loop(Time) ->
  receive
    stop ->
      io:format("stopped~n",[]);

    {start, Service} ->
      {A1,A2,A3} = now(),
      random:seed(A1, A2, A3),
      put(service, Service),
      customer_loop(100)

  after 
    Time ->
    get(service)! {enter},
      customer_loop(round(exp_dist_rand() * 100))
  end.

% 0~120の範囲で0.2刻みの指数分布に従う数を、ランダムに発生させる。
exp_dist_rand() -> rand_sub(1, random:uniform()) / 5.

rand_sub(6000, _)   -> 6000;
rand_sub(S, Random) -> select(S, 1 - math:exp(-6 * S / 300), Random).

select(S, V, Random) when V > Random -> S - 1;
select(S, V, Random) when V == Random-> S;
select(S, _, Random) -> rand_sub(S + 1, Random).
serviceプロセスは以下のようなコード。(定期的にキューを監視しているのが、何だか格好悪い。うまくやれば改善できそう。)
-module(service).
-export([start_service/1, service_loop/0]).

% service プロセスの生成
start_service(O) ->
  S = spawn(service, service_loop, []),
  S! {init, O},
  S.

% service プロセス
% customer プロセスからのリクエストを1個0.8秒で処理している真似をする
% 窓口一個の待ち行列。
service_loop() ->
  receive
  % 停止
  stop->io:format("stopped~n"), 0;

  % 初期化
  {init, O}->
    put(queue, queue:new()),
    put(observer, O),
    service_loop();

  % customer プロセスからのリクエスト
  {enter}->
    put(queue, queue:in(now(), get(queue))),
    service_loop();

  % リクエスト処理後の始末。次のリクエスト処理を許可する。
  {after_processed}->
    erase(in_service),
    {{value, _}, Q2} = queue:out(get(queue)),
    put(queue, Q2),
    service_loop()

  % 5ms 間隔でキューを見て、リクエストがあれば処理
  after
    5->
    In_Service = get(in_service),
    if 
      true /= In_Service -> do_service();
      true->0
    end,
    service_loop()
 end.

% 0.8 秒かかる処理のシミュレーション
% 開始時に待ち時間をobserver プロセスに送信する
do_service()->
  Queue = get(queue),
  Is_Empty = queue:is_empty(Queue),
  if
    not Is_Empty ->
      put(in_service, true),

      {value, Start} = queue:peek(Queue),

      Diff = timer:now_diff(now(), Start),
      get(observer)! {done, Diff, queue:len(Queue)},

      erlang:send_after(800, self(), {after_processed});
    true -> 0
  end.

observerプロセスは以下のようなコード。service プロセスは要求の処理を始めるたびに、その要求がキューで待たされた時間を送ってくるので、その都度平均待ち時間を更新しコンソールに表示する。
-module(observer).
-export([start_observer/0, observer_loop/0]).

% シミュレーション観測者のプロセスを開始する
start_observer() ->
  O = spawn(observer, observer_loop, []),
  O! init,
  O.

% service から待ち時間の報告を受けて集計する
observer_loop() ->
  receive
    init->
      put(request_count, 0), 
      put(average_waiting_time, 0),
      observer_loop();
    stop->
      io:format("stopped~n"),
      0;
    {done, Time, Len}->
      {RC, AWT} = get_awt(Time),
      io:format("C=~p; A=~p; T=~p; L=~p~n", 
      [RC, round(AWT/100)/1000, round(Time/100)/1000, Len]),
      observer_loop()
  end.

% 各リクエストの処理待ち時間で、プロセス辞書を更新する
get_awt(Time)->
  RC = get(request_count),
  AWT = get(average_waiting_time),
  New_AWT = (AWT * RC + Time) / (RC + 1),
  put(average_waiting_time, New_AWT),
  put(request_count, RC + 1),
  {RC + 1, New_AWT}.
実行は以下のように始める。
86> O = observer:start_observer().
<0.201.0>
87> S = service:start_service(O).
<0.203.0>
88> C = customer:start_customer(S).
<0.205.0>
89> C=1; A=0.16; T=0.16; L=1
89> C=2; A=0.39; T=0.62; L=1
C が要求の個数、A が平均待ち時間、T が最後の要求の待ち時間、L は待ち行列の長さ。理論どおりだと A の値が 32 に近づく予想だが、見た感じ上手くいっているらしい。(まあなんだかんだ時間かかったが。)
89> C=1000; A=31.534; T=36.56; L=4
89> C=1001; A=31.537; T=34.37; L=3
89> C=1002; A=31.545; T=40.0; L=4
89> C=1003; A=31.55; T=35.93; L=5
89> C=1004; A=31.54; T=21.4; L=5
89> C=1005; A=31.537; T=28.6; L=7
89> C=1006; A=31.532; T=27.19; L=7
89> C=1007; A=31.535; T=33.75; L=7
89> C=1008; A=31.544; T=40.94; L=6
89> C=1009; A=31.553; T=40.16; L=7
89> C=1010; A=31.567; T=46.56; L=6
まだまだ下手糞だが、だんだん Erlang --- と言うより並列処理プログラムが分かってきた気がする。昔、よく言われていたオブジェクト指向の説明で、現実世界の模倣なんて言い方があったけど、Erlang のプロセスの方がよほどそれっぽいところがある。分析レベルのオブジェクト指向モデルでは、結構いろんなモノがUML風に言うとアクティブオブジェクトだったりするけど、そういった要素はErlang 的な並列処理アプローチのほうがよほど直接的に実装できるはず。

やはり、職業プログラマとしてヒントやら気づきやらを得ようとして Erlang をいじってみるとしたら、クラスを書いてみるとか既存の関数型言語の練習問題的なものを書くような作業は大して意味がなさそう。そうではなく、いつもの言語で普段やってるいろんな事を、プロセスを使って書いてみたり、或いはプロセスでなければできないことを見つけてみるのが、たぶん一番有益なんだろう。

あと、まだ試していないが Scala にも同じような機能があったはず。こっちも面白そう。

0 件のコメント:

コメントを投稿