Coro のメモ書き。
前回の記事で最大の懸念事項だった「LWP 利用時の timeout 処理」が何とか実装できたので、次は Coro::Semaphore を使って同一ホストへの接続を厳密に制御してみたいと思います。
今回はついでに HTML 解析処理も付け加えてみました。
#!/usr/bin/perl use strict; use warnings; use Coro; use Coro::Timer; use Coro::Semaphore; use Coro::LWP; use LWP::UserAgent; use URI; use XML::FeedPP; use HTML::TreeBuilder; use Digest::MD5 qw/md5_hex/; use List::Util; my $lock = {}; my $queue = {}; my @url = (); my @done = (); my @fail = (); ## 対象の url リストを適当に my @url = ( .... ); ## ランダムソート @url = List::Util::shuffle @url; ## fetcher, parser, manager のスレッドを作成 worker( fetcher => \&fetcher, 5 ); worker( parser => \&parser, 1 ); manager( scalar(@url) ); ## fetcher に url を送りつけて処理開始 queue("fetch")->put($_) for @url; schedule; ## 取得出来たデータを表示 foreach (@done) { print "$_\n"; } sub done { push(@done, $_[0]); } sub fail { push(@fail, $_[0]); } sub queue { my $name = shift; $queue->{$name} ||= Coro::Channel->new; } sub worker { my ($name, $code, $num) = @_; for (1 .. $num) { my $desc = $name . "_" . rand_md5(); async { Coro::current->desc($desc); while ( 1 ) { $code->(); } }; } } sub manager { my $job = shift; async { while ( 1 ) { Coro::Timer::sleep 1; my $done = scalar(@done); my $fail = scalar(@fail); my $task = int(($done + $fail) / $job * 100); ## 進捗表示 print "Task: $task % Total: $job Done: $done Fail: $fail\n"; my @fetcher = Coro::State::list; foreach my $fetcher (@fetcher) { next if (!$fetcher->desc); next if ($fetcher->desc !~ /fetcher/); next if (! defined($fetcher->{alive})); ## 状態が idle ではない、かつ、生存時間が切れた fetcher を終了させる if ($fetcher->{alive} <= 0 && ! $fetcher->{idle}) { print "timeout: ", $fetcher->desc, "\n"; $fetcher->cancel(); fail("timeout: ", $fetcher->desc); ## fetcher を終了させた後は lock を解除 $lock->{$fetcher->{host}}->up; ## フレッシュな fetcher を生成 worker( fetcher => \&fetcher, 1 ); } else { print "alive: ", $fetcher->desc, " keep alive = $fetcher->{alive} cur = $fetcher->{status}\n"; ## 状態が idle ではない fetcher の生存時間を減算 if (! $fetcher->{idle}) { $fetcher->{alive} -= 1; } } } if ( ($done + $fail) >= $job) { print "completed: Done => $done Fail => $fail\n"; $Coro::main->ready; } } }; } sub fetcher { my $url = queue("fetch")->get; my $host = URI->new($url)->host; ## domain 毎の lock。最大で 2 my $sem = $lock->{$host} ||= Coro::Semaphore->new(2); my $lwp = ''; my $res = 0; ## manager が lock を解除する為に参照 $Coro::current->{host} = $host; ## debug 用の状態遷移 $Coro::current->{status} = "$host [lock]"; ## lock 処理 ($lock->{$host}->count -= 1 ?) $lock->{$host}->down; $Coro::current->{status} = "$host [run]"; $Coro::current->{idle} = 0; $Coro::current->{alive} = 10; $lwp = LWP::UserAgent->new(); $res = $lwp->get($url); if ($res->is_success) { ## parser に取得出来たコンテンツを送りつける ## put はブロックしないので送りつけた後はそのまま次に進む queue("parser")->put($res->content); } else { fail("fail:$url"); } ## 3 秒の wait 処理 $Coro::current->{status} = "$host [wait]"; $Coro::current->{idle} = 1; Coro::Timer::sleep 3; $Coro::current->{status} = 'idle'; $Coro::current->{alive} = 10; $lock->{$host}->up; } sub parser { my $content = queue("parser")->get; my $tree = HTML::TreeBuilder->new; $tree->parse($content); $tree->eof(); ## html から <title> を探して取得 if ($tree->find('title')) { foreach ($tree->find('title')) { done($_->as_text); last; } } else { fail("notitle"); } } sub rand_md5 { my $max = 9999; my $min = 1000; my $rand = time() . int( rand($max - $min + 1) ) + $min; my $md5 = md5_hex($rand); return($md5); }
実行結果はこんな感じ。
Task: 89 % Total: 28 Done: 25 Fail: 0 alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 9 cur = example.com [wait] alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 10 cur = worklog.be [lock] alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 9 cur = example.com [wait] alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 9 cur = worklog.be [wait] alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 9 cur = worklog.be [wait] Task: 92 % Total: 28 Done: 26 Fail: 0 alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 10 cur = example.com [wait] alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 10 cur = worklog.be [run] alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 10 cur = worklog.be [run] alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 10 cur = idle alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 10 cur = idle Task: 100 % Total: 28 Done: 28 Fail: 0 alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 10 cur = example.com [wait] alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 9 cur = worklog.be [wait] alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 9 cur = worklog.be [wait] alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 10 cur = idle alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 10 cur = idle completed: Done => 28 Fail => 0 ? 以下、取得できたタイトル ?
前回までは待機している fetcher がいればどんどん url を投げつけて即刻 LWP していましたが、今回はドメイン名毎の同時接続数を 2 で制御して上限値の場合は lock 状態に。
fetcher は常時 5 ついますが、デバッグ用のメッセージを見てる限り上手くできたかと思います。
fetcher はこんな感じに状態遷移していきます。
- lock (上限に達していなければ次へ)
- run (LWP の処理中)
- wait (取得完了後の wait 処理)
- idle (何もタスクがない状態。url が送られて来たら 1 へ)
lock 処理は最初 $lock->guard みたいにしていたのですが、これだと終了時にプログラムがコアダンプしてしまう。多分、未開放のスレッドがいるのに ready されたから ?
でもこのコードも最後は $lock->up する前に終わってるしここがあやふや。
多分、使ってるうちに問題が見えてくるかもしれないのでもうちょっと揉んでみたいと思います。