work.log

元エンジニアの備忘録的ブログ

CoroでLWPを使うスレッドにtimeout処理を入れる

投稿:2014-04-16 17:47  更新:

Coro のメモ書き。

下記の記事で、ようやくそれらしいクローラもどきをつくることができましたが、このコードには致命的な問題があります。

Coro::LWP を使うとかなりお手軽に並行処理ができて便利ではあるのですが、そのままだと LWP が timeout してくれないんです。(オプションを設定しても効かない!)

LWP の timeout は IO::Socket の機能を使っているらしく、Coro::LWP を使ってしまうとここが別のモジュールに置き換わるらしく上手く動かないんだとか。

参考にしたページ
Coro::LWPでタイムアウトさせる

大体の Web サーバは数秒以内にレスポンスを返してくれるとは思いますが、稀にレスポンスを中々返してくれないサーバもいるのでやはり timeout しないのは致命的。

と、いうことで上記記事を参考に管理スレッドに LWP を timeout させるような機能を追加してみました。

#!/usr/bin/perl

use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::LWP;
use LWP::UserAgent;
use Digest::MD5 qw/md5_hex/;

    my $queue = {};
    my @url   = ();
    my @done  = ();
    my @fail  = ();

    for (my $i=0; $i<10; $i++) {
        ## ランダムに sleep させる CGI
        push(@url, 'http://example.jp/timer.cgi');
    }

    worker( fetcher => \&fetcher, 3 );
    manager( scalar(@url) );

    queue("fetch")->put($_) for @url;
    schedule;

sub done {
    push(@done, $_[0]);
}

sub fail {
    push(@fail, $_[0]);
}

sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}

sub worker {

    my ($name, $code, $num) = @_;

    for (1 .. $num) {

        ## worker にディスクリプションを設定する
        ## 例: fetcher_hogehoge
        my $desc = $name . "_" . rand_md5();

        async {

            Coro::current->desc($desc);

            while ( 1 ) {
                $code->();
            }

        };

    }

}

sub manager {

    my $job = shift;

    async {

        while ( 1 ) {

            Coro::Timer::sleep 1;

            my $done = scalar(@done);
            my $fail = scalar(@fail);

            print "Task: $done/$job Fail: $fail\n";

            ## Coro::State::list で稼働中のスレッド一覧を取得
            my @fetcher = Coro::State::list;

            foreach my $fetcher (@fetcher) {

                ## ディスクリプションが未設定、fetcher 以外はスキップ
                next if (!$fetcher->desc);
                next if ($fetcher->desc !~ /fetcher/);

                ## 生存時間を減算していく
                $fetcher->{alive} -= 1;

                ## 生存時間が切れた fetcher がある場合
                if ($fetcher->{alive} <= 0) {

                    print "timeout: ", $fetcher->desc, "\n";
                    fail("timeout: ", $fetcher->desc);

                    ## fail カウントして対象の fetcher をキャンセル (timeout)
                    $fetcher->cancel();

                    ## 代わりに新しく 1 つ fetcher を作成する
                    worker( fetcher => \&fetcher, 1 );

                } else {

                    print "alive: ", $fetcher->desc, " keep alive = $fetcher->{alive}\n";

                }

            }

            if ( ($done + $fail) == $job) {
                print "completed: Done => $done Fail => $fail\n";
                $Coro::main->ready;
            }
        }
    };

}

sub fetcher  {

    my $url = queue("fetch")->get;
    my $lwp = '';
    my $res = 0;

    ## fetcher の生存時間を設定 (秒)
    $Coro::current->{alive} = 10;

    $lwp = LWP::UserAgent->new();
    $res = $lwp->get($url);

    #logger($url);

    if ($res->is_success) {
        done($res->title);
    } else {
        fail("fail:$url");
    }

    ## 処理が完了した場合は生存時間を初期値に戻す
    $Coro::current->{alive} = 10;

}

sub rand_md5 {

    my $max = 9999;
    my $min = 1000;

    my $rand = time() . int( rand($max - $min + 1) ) + $min;
    my $md5 = md5_hex($rand);

    return($md5);

}

3 つの worker (fetcher) と 1 つの manager を立ち上げるのは前回と同じ。

違うのは、fetcher に「ディスクリプション」と「生存時間」を設定して manager で状態をチェックしたり、キャンセル (timeout) したりという点です。

こんな感じになります。

  1. fetcher が生存時間 10 秒で稼働
  2. 1 秒毎に manager が fetcher をチェック
  3. 生存時間 を過ぎた fetcher がいたらキャンセルして timeout させる
  4. キャンセルした際には新しく fetcher を起動する

時間はきっちり 10 秒というわけではないので注意。

実行結果はこんな感じになります。

fetcher 稼働中

Task: 0/10 Fail: 0
alive: fetcher_69db8f36576df121fd801071d581827a keep alive = 9
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 9
alive: fetcher_bf9da072200ae17eb8ce8bc34ddd6cff keep alive = 9

Task: 1/10 Fail: 0
alive: fetcher_69db8f36576df121fd801071d581827a keep alive = 8
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 8
alive: fetcher_bf9da072200ae17eb8ce8bc34ddd6cff keep alive = 9 # 処理に成功したのでカウントリセット

Task: 1/10 Fail: 0
alive: fetcher_69db8f36576df121fd801071d581827a keep alive = 7
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 7
alive: fetcher_bf9da072200ae17eb8ce8bc34ddd6cff keep alive = 8
.
.
.

fetcher の timeout

Task: 5/10 Fail: 0
timeout: fetcher_69db8f36576df121fd801071d581827a # カウント 0 で timeout
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 6
alive: fetcher_bf9da072200ae17eb8ce8bc34ddd6cff keep alive = 1

Task: 5/10 Fail: 1
alive: fetcher_c6f7220bd910d9ba19de024c86ca6774 keep alive = 9 # 新しく生成された fetcher
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 5
timeout: fetcher_bf9da072200ae17eb8ce8bc34ddd6cff # カウント 0 で timeout

Task: 5/10 Fail: 2
alive: fetcher_f6f7754b1b33eeeaf46210cd32f442bc keep alive = 9 # 新しく生成された fetcher
alive: fetcher_c6f7220bd910d9ba19de024c86ca6774 keep alive = 8
alive: fetcher_9674faa931d6575293f6d733761d5e6b keep alive = 4
.
.
.
-----
completed: Done => 8 Fail => 2

こんな具合にうまくできました。

url が送られて来なくて何もやることがなくなった fetcher は、生存時間が切れてもそのまま動き続けますが全部の処理が完了すると一斉に終了するのでこれで良しとしました。

Coro の LWP timeout のメモ書きはコレで以上です。

おすすめのVPSサーバ

  • OSが選べる
  • VPS同士でLANが組める
  • 複数台構成向き

このブログで使っています。

  • 転送量が多いサービスに
  • 借りてるのは3年間一度もdown無し!

よく見られている記事

  • 本日
  • 週間
  • 月間