work.log

元エンジニアの備忘録的ブログ

Coroで作ったクローラもどきのスレッドをセマフォで制御する

投稿:2014-04-17 18:47  更新:

Coro のメモ書き。

前回の記事で最大の懸念事項だった「LWP 利用時の timeout 処理」が何とか実装できたので、次は Coro::Semaphore を使って同一ホストへの接続を厳密に制御してみたいと思います。

今回はついでに HTML 解析処理も付け加えてみました。

#!/usr/bin/perl

use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::Semaphore;
use Coro::LWP;
use LWP::UserAgent;
use URI;
use XML::FeedPP;
use HTML::TreeBuilder;
use Digest::MD5 qw/md5_hex/;
use List::Util;

    my $lock  = {};
    my $queue = {};
    my @url   = ();
    my @done  = ();
    my @fail  = ();


    ## 対象の url リストを適当に
    my @url   = ( .... );

    ## ランダムソート
    @url = List::Util::shuffle @url;

    ## fetcher, parser, manager のスレッドを作成
    worker( fetcher => \&fetcher, 5 );
    worker( parser => \&parser, 1 );
    manager( scalar(@url) );

    ## fetcher に url を送りつけて処理開始
    queue("fetch")->put($_) for @url;
    schedule;

    ## 取得出来たデータを表示
    foreach (@done) {
        print "$_\n";
    }

sub done {
    push(@done, $_[0]);
}

sub fail {
    push(@fail, $_[0]);
}

sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}

sub worker {

    my ($name, $code, $num) = @_;

    for (1 .. $num) {

        my $desc = $name . "_" . rand_md5();

        async {

            Coro::current->desc($desc);

            while ( 1 ) {
                $code->();
            }

        };

    }

}

sub manager {

    my $job = shift;

    async {

        while ( 1 ) {

            Coro::Timer::sleep 1;

            my $done = scalar(@done);
            my $fail = scalar(@fail);
            my $task = int(($done + $fail) / $job * 100);

            ## 進捗表示
            print "Task: $task % Total: $job Done: $done Fail: $fail\n";

            my @fetcher =  Coro::State::list;

            foreach my $fetcher (@fetcher) {

                next if (!$fetcher->desc);
                next if ($fetcher->desc !~ /fetcher/);
                next if (! defined($fetcher->{alive}));

                ## 状態が idle ではない、かつ、生存時間が切れた fetcher を終了させる
                if ($fetcher->{alive} <= 0 && ! $fetcher->{idle}) {

                    print "timeout: ", $fetcher->desc, "\n";
                    $fetcher->cancel();
                    fail("timeout: ", $fetcher->desc);

                    ## fetcher を終了させた後は lock を解除
                    $lock->{$fetcher->{host}}->up;

                    ## フレッシュな fetcher を生成
                    worker( fetcher => \&fetcher, 1 );

                } else {

                    print "alive: ", $fetcher->desc, " keep alive = $fetcher->{alive} cur = $fetcher->{status}\n";

                    ## 状態が idle ではない fetcher の生存時間を減算
                    if (! $fetcher->{idle}) { $fetcher->{alive} -= 1; }

                }


            }

            if ( ($done + $fail) >= $job) {
                print "completed: Done => $done Fail => $fail\n";
                $Coro::main->ready;
            }

        }
    };

}

sub fetcher  {

    my $url = queue("fetch")->get;
    my $host = URI->new($url)->host;

    ## domain 毎の lock。最大で 2
    my $sem = $lock->{$host} ||= Coro::Semaphore->new(2);

    my $lwp = '';
    my $res = 0;

    ## manager が lock を解除する為に参照
    $Coro::current->{host} = $host;

    ## debug 用の状態遷移
    $Coro::current->{status} = "$host [lock]";

    ## lock 処理 ($lock->{$host}->count -= 1 ?)
    $lock->{$host}->down;

    $Coro::current->{status} = "$host [run]"; 
    $Coro::current->{idle} = 0;
    $Coro::current->{alive} = 10;

    $lwp = LWP::UserAgent->new();
    $res = $lwp->get($url);

    if ($res->is_success) {

        ## parser に取得出来たコンテンツを送りつける
        ## put はブロックしないので送りつけた後はそのまま次に進む
        queue("parser")->put($res->content);

    } else {
        fail("fail:$url");
    }

    ## 3 秒の wait 処理
    $Coro::current->{status} = "$host [wait]"; 
    $Coro::current->{idle} = 1;
    Coro::Timer::sleep 3;

    $Coro::current->{status} = 'idle'; 
    $Coro::current->{alive} = 10;

    $lock->{$host}->up;

}

sub parser {

    my $content = queue("parser")->get;

    my $tree = HTML::TreeBuilder->new;
    $tree->parse($content);
    $tree->eof();

    ## html から <title> を探して取得
    if ($tree->find('title')) {

        foreach ($tree->find('title')) {

            done($_->as_text);
            last;

        }

    } else {

        fail("notitle");

    }

}

sub rand_md5 {

    my $max = 9999;
    my $min = 1000;

    my $rand = time() . int( rand($max - $min + 1) ) + $min;
    my $md5 = md5_hex($rand);

    return($md5);

}

実行結果はこんな感じ。

Task: 89 % Total: 28 Done: 25 Fail: 0
alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 9 cur = example.com [wait]
alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 10 cur = worklog.be [lock]
alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 9 cur = example.com [wait]
alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 9 cur = worklog.be [wait]
alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 9 cur = worklog.be [wait]

Task: 92 % Total: 28 Done: 26 Fail: 0
alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 10 cur = example.com [wait]
alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 10 cur = worklog.be [run]
alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 10 cur = worklog.be [run]
alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 10 cur = idle
alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 10 cur = idle

Task: 100 % Total: 28 Done: 28 Fail: 0
alive: fetcher_d67894206aacfd6a936acc8435a1ee3e keep alive = 10 cur = example.com [wait]
alive: fetcher_3d20f4d6066abca3361baa67301bbab0 keep alive = 9 cur = worklog.be [wait]
alive: fetcher_52807581c57fcdd3469997105f8f0a46 keep alive = 9 cur = worklog.be [wait]
alive: fetcher_0eef0cdf7f07313d552d60cdc30ad7de keep alive = 10 cur = idle
alive: fetcher_814d7a04c57d0be1185b9cfee9115fe4 keep alive = 10 cur = idle

completed: Done => 28 Fail => 0

? 以下、取得できたタイトル ?

前回までは待機している fetcher がいればどんどん url を投げつけて即刻 LWP していましたが、今回はドメイン名毎の同時接続数を 2 で制御して上限値の場合は lock 状態に。

fetcher は常時 5 ついますが、デバッグ用のメッセージを見てる限り上手くできたかと思います。

fetcher はこんな感じに状態遷移していきます。

  1. lock (上限に達していなければ次へ)
  2. run (LWP の処理中)
  3. wait (取得完了後の wait 処理)
  4. idle (何もタスクがない状態。url が送られて来たら 1 へ)

lock 処理は最初 $lock->guard みたいにしていたのですが、これだと終了時にプログラムがコアダンプしてしまう。多分、未開放のスレッドがいるのに ready されたから ?

でもこのコードも最後は $lock->up する前に終わってるしここがあやふや。

多分、使ってるうちに問題が見えてくるかもしれないのでもうちょっと揉んでみたいと思います。

おすすめのVPSサーバ

  • OSが選べる
  • VPS同士でLANが組める
  • 複数台構成向き

このブログで使っています。

  • 転送量が多いサービスに
  • 借りてるのは3年間一度もdown無し!

よく見られている記事

  • 本日
  • 週間
  • 月間