Erlang の無限リストを書く場合、デフォで遅延評価ってわけじゃないので、[先頭要素|fun()-> 再帰呼出し end]として記述した上で、例えば[H|F] なら F()と明示的に呼び出して次の[H'|F']を得るという作法になるが、実装できないことはない。
これを活用して、Haskell の Data.Numbers.Primes をパクってみた。
元の Haskell コードを思い切って一言で言ってしまうと、「ホイールで生成した素数候補の無限リストを、PriorityQueue に蓄積された倍数を用いてふるいにかけつつ、同時にPriorityQueue も随時更新していくコード」といったもの。
なのでポイントとしては、ホイールと、無限リストと、PriorityQueue と、「ふるい」そのものって感じだけど、今回はホイール自体の生成は割愛し(簡単だし)ベタ書きのハードコードとした。
こんなんなった。
-module(primes). -compile(export_all). enqueue(Ns, Queue) -> merge({ Ns, [] }, Queue). merge({}, Y) -> Y; merge(X, {}) -> X; merge(X, Y) -> case prio(X) =< prio(Y) of true -> join(X, Y); _ -> join(Y, X) end. prio({[X|_], _}) -> X. join({Ns, Qs}, Q) -> {Ns, [Q|Qs]}. mergeAll([]) -> {}; mergeAll([Q]) -> Q; mergeAll([Q1|[Q2|Qs]]) -> merge(merge(Q1, Q2), mergeAll(Qs)). spin(Start, Wheel) -> [Start| fun() -> spin(Start, Wheel, Wheel) end]. spin(Prev, [], Wheel) -> spin(Prev, Wheel, Wheel); spin(Prev, [W|Ws], Wheel) -> Prime = Prev + W, [Prime | fun() -> spin(Prime, Ws, Wheel) end]. comps([N|Nl]) -> [N*N| fun()-> comps(N, Nl()) end ]. comps(P, [N|Nl]) -> [P*N| fun()-> comps(P, Nl()) end ]. sieve([N|Ns], {}) -> [N| fun()-> sieve(Ns(), enqueue(comps([N|Ns]), {})) end]; sieve([N|Ns], Queue) -> {[M|Ms], Qs} = Queue, if N < M -> [N| fun()-> sieve(Ns(), enqueue(comps([N|Ns]), Queue)) end]; true -> Q = mergeAll(Qs), if M == N -> sieve(Ns(), enqueue(Ms(), Q)); M < N -> sieve([N|Ns], enqueue(Ms(), Q)) end end. to_lazy([], Another) -> Another; to_lazy([P|Ps], Another) -> [P | fun()->to_lazy(Ps, Another) end]. primes([P|Ps], Wheel) -> sieve(to_lazy(lists:reverse(Ps), spin(P, Wheel)), {}). nth(1, [P|_]) -> P; nth(N, [_|Pl]) -> nth(N-1, Pl()). main(N) -> nth(N, primes([7,5,3,2], [4,2,4,2,4,6,2,6])).実行すると以下のようになった。
2> timer:tc(primes, main, [1000000]). {67717516,15485863}100万番目の素数 15485863 を得るのに、1分ちょいかかってしまっているが、まあこんなもんだ。(以前、PriorityQueue を知らずに、同じように Erlang で無限素数リストを作ってみた事があったが、それにくらべると遥かに速い。)
■ コードをざっと解説すると、まず無限リストだけど、ここでは次のものが該当する。
- 素数候補の無限リスト: 例えば、ホイールが[2,4]だと、5から始めて[2,4,2,4,..]と足していって、[5,7,11,13,17....]を生成していく。下のコードでは、関数 spin/2, spin/3 がこれをやっている。
- 合成数の無限リスト: 素数候補リスト中のある数と、その数以降の部分リストを用いて、合成数の無限リストを作る。例えば 7ならば、[7*7,7*11,7*13,7*17....] となる。判別された素数ごとに、この無限リストが生成される。comps/1, comps/2 がそれ。
Haskell の コードだけ眺めていたときは、さっぱりが訳が分からなかったけど、
sieve [3,5..] Empty から開始して 12,13個の素数を得る過程を、紙とシャーペンの筆算で評価したり図に書いたりしていると、ほぼ体得できた。
元のコードと同様、Pairing Heap 様に構成し、合成数の無限リストのそれぞれの先頭要素を比較して優先度を判定している。
型としては、元の Haskell コードでは、
data Queue = Empty | Fork [Integer] [Queue]といった形で定義されているけど、ここでは単純に Empty を空タプル{}、Fork を{ 倍数リスト、[倍数リスト] }として表現することにした。
enqueue/2, merge/2, prio/2, join/2, mergeAll/1 が該当する。
■ 3. そして「ふるい」。
与えられた素数候補のリストの先頭要素を、PriorityQueue の頂上にある合成数と比較して、同じなら合成数としてキャンセル、小さければ素数と判定するというコード。素数候補リスト先頭要素が、PriorityQueue 頂上の合成数より大きくなってしまう場合もあるが、その際には PriorityQueue からその合成数を除外して再トライする。
まあ、ざっくり言うとそんな感じだけど、この辺りは自然言語でコード以上に簡潔に説明するのは難しい。