work.log

元エンジニアの備忘録的ブログ

Coroで作ったクローラもどきにスレッド管理機能をつける

投稿:2014-04-15 20:50  更新:

Coro のメモ書き。

下記の記事で作成した、クローラもどきに「スレッド管理機能」をつけてみます。

何でコレが必要かと言うと、上記のコードのままだとメイン -> worker (fetcher と receiver) にスレッドが移ったまま、メインスレッドに戻る事ができないのでプログラムが終了しません。

※ 多分、こういう認識だと思う…

ということで、今回は worker スレッドを管理する manager スレッドを追加してみます。

では早速コード。

#!/usr/bin/perl

use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::LWP;
use LWP::UserAgent;
use XML::FeedPP;

    my $queue = {};
    my @url   = ();
    my @done  = ();
    my @fail  = ();

    my @feed = (
           'http://example.jp/feed',
           'http://example.com/feed',
       );

    foreach my $url (@feed) {

        my $lwp = LWP::UserAgent->new();
        my $res = $lwp->get($url);
        my $feed = XML::FeedPP->new($res->content);

        foreach ($feed->get_item()) {
            push(@url, $_->link());

        }

    }

    ## worker (fetcher) スレッドは 3 つ作成
    worker( fetcher => \&fetcher, 3 );

    ## manager スレッドは 1 つ作成
    manager( scalar(@url) );

    ## fetcher に url を送りつける
    queue("fetch")->put($_) for @url;

    ## 各スレッドを呼び起こす
    schedule;

    ## 全て終わった後にココに戻ってくる
    ## 取得に成功したデータを表示
    foreach (@done) {
        print "$_\n";
    }

## 成功したデータを格納するサブルーチン
sub done {
    push(@done, $_[0]);
}

## 失敗した際のデータを格納するサブルーチン
sub fail {
    push(@fail, $_[0]);
}

sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}

## worker スレッドを作成するサブルーチン
sub worker {

    my ($name, $code, $num) = @_;

    for (1 .. $num) {

        async {

            while( 1 ) {
                $code->();
            }

        };

    }

}

## manager スレッドを作成するサブルーチン
sub manager {

    my $job = shift;

    async {

        while( 1 ) {

            ## Coro::Timer で 1sec 毎にスリープさせる
            ## この間に処理が他に移る
            Coro::Timer::sleep 1;

            ## @done, @fail の数を取得しメッセージを表示
            my $done = scalar(@done);
            my $fail = scalar(@fail);

            print "Task: $done/$job Fail: $fail\n";

            ## 全ジョブが完了した場合
            if ( ($done + $fail) == $job) {
                print "completed: Done => $done Fail => $fail\n";
                
                ## メインスレッドに戻す
                $Coro::main->ready;
            }

        }

    };

}

sub fetcher  {

    my $url = queue("fetch")->get;
    my $lwp = '';
    my $res = 0;

    $lwp = LWP::UserAgent->new();
    $res = $lwp->get($url);

    ## 成功したデータを @done に格納
    if ($res->is_success) {
        done($res->title);

    ## 失敗した url を @fail に格納
    } else {
        fail("fail:$url");
    }

}

コメント箇所は多分こんな感じという認識で書いています。

これを実行するとこうなります。

$ perl my_coro.pl
Task: 3/30 Fail: 0
Task: 8/30 Fail: 0
Task: 11/30 Fail: 0
Task: 16/30 Fail: 0
Task: 24/30 Fail: 0
Task: 28/30 Fail: 2  # ここはわざと fail する url を指定してみた
completed: Done => 28 Fail => 2

~ 以下、取得できたタイトル ~

こんな感じにうまくできました。

何処かでスレッドを起こす、終わったら眠らせるみたいな説明を読んでイマイチ理解できずにいましたが、今日改めて見たら何となくですが理解できた気がします。

参考にしたページ
Coroのscheduleからの戻り方