work.log

元エンジニアの備忘録的ブログ

CoroのLWPを使うスレッドを一定条件で強制終了させる

投稿:2014-05-13 19:07  更新:

Coro のメモ書き。

下記記事で作成した Coro を使ったクローラをベースに、まともなヤツを書いて動かしていたのですが早速問題が発生。

Coro の処理からメインスレッドに戻るのに、成功、失敗を含めた全処理数をチェックして完了ならメインスレッドへという事をしていましたが、何故かこの処理数が合わない現象が稀に発生してメインスレッドに戻れない現象が。

失敗例はこんな感じ。

Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]

## 処理の成功可否が何処かでロスト
## 以降、永遠に終了条件に達しなくなる

Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle
・
・
・

これはマズいという事で、下記のような強制終了機能を付け加えてみました。ちなみにこのコードはテストようなので絶対に強制終了します。

#!/usr/bin/perl

use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::Semaphore;
use Coro::LWP;
use URI;
use LWP::UserAgent;
use Digest::MD5 qw/md5_hex/;
use Try::Tiny;

    my $job = '10';
    my $lock  = {};
    my $queue = {};
    my @done  = ();
    my @fail  = ();
    
    my $debug = 1;
    my $host_sem = 1;
    my $timeout = 10;
    my $main = Coro::Signal->new;

    my $url = 'http://example.jp/';

    &worker( fetcher => \&fetcher, 10 );
    &worker( manager => \&manager, 1 );

    &queue("fetch")->put($url);
    $main->wait;


sub done { push(@done, $_[0]); }
sub fail { push(@fail, $_[0]); }
sub debug { if ($debug) { print $_[0] . "\n"; } }

sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}

sub worker {

    my ($name, $code, $num) = @_;

    for (1 .. $num) {

        my $desc = $name . "_" . &rand_md5();

        async {

            Coro::current->desc($desc);

            while ( 1 ) {

                try {
                    $code->();
                } catch {
                    warn $_;
                    &fail($_);
                };
            }

        };

    }

}

sub manager {

    Coro::Timer::sleep 1;

    my $done = scalar(@done);
    my $fail = scalar(@fail);
    my $active = 0;

    my $task = int(($done + $fail) / $job * 100);
    print "Task: $task % Total: $job Done: $done Fail: $fail\n";

    my @coro = Coro::State::list;

    foreach my $coro (@coro) {

        next if (! $coro->desc);
        next if ($coro->desc !~ /fetcher/);
        next if (! defined($coro->{alive}));

        ## ステータス情報を持つ fetcher がいればカウント
        $active++;

        ## wait フラグがない fetcher のタイムアウト処理
        if ($coro->{alive} <= 0 && ! $coro->{wait}) {

            ## 該当する fetcher をキャンセルして、新しく生成
            $coro->cancel();
            &worker( fetcher => \&fetcher, 1 );

            ## idle ステータス以外の場合は fail カウント
            if ($coro->{status} !~ /idle/) {

                &fail("timeout: ", $coro->desc);
                $lock->{$coro->{host}}->up;

            }

            &debug("timeout: " . $coro->desc, "\n");

        } else {
            &debug($coro->desc . " alive = $coro->{alive} cur = $coro->{status}");

            ## wait フラグがない fetcher の生存時間を減算
            if (! $coro->{wait}) { $coro->{alive} -= 1; }
        }
    }

    ## 全ての処理が完了したらメインスレッドへ戻る
    if ( ($done + $fail) >= $job) {

        print "completed: Done => $done Fail => $fail\n";
        $main->send;

    ## active な fetcher がいなければ強制でメインスレッドへ戻る
    } elsif (!$active) {

        &debug("notice: forced termination");
        print "completed: Done => $done Fail => $fail\n";
        $main->send;

    }

}

sub fetcher  {

    my $url = &queue("fetch")->get;
    my $cur = Coro::current;
    my $host = URI->new($url)->host;
    my $sem = $lock->{$host} ||= Coro::Semaphore->new($host_sem);
    my $lwp = LWP::UserAgent->new();
    my $res = 0;

    ## idel -> lock へ
    ## wait on
    $cur->{host} = $host;
    $cur->{status} = "$host [lock]";
    $cur->{alive} = $timeout;
    $cur->{wait} = 1;

    ## ロック処理。セマフォの上限じゃなければ次へ
    $lock->{$host}->down;

    ## lock -> run へ
    ## wait off
    $cur->{status} = "$host [run]"; 
    $cur->{wait} = 0;

    $res = $lwp->get($url);

    if ($res->is_success) {
        &done($url);
    } else {
        &fail($url);
    }

    ## run -> wait へ
    ## wait on
    $cur->{status} = "$host [wait]"; 
    $cur->{wait} = 1;

    ## 接続後のスリープ
    Coro::Timer::sleep 3;

    ## wait -> idle へ
    ## wait off
    $cur->{status} = 'idle'; 
    $cur->{alive} = $timeout;
    $cur->{wait} = 0;
    undef $cur->{host};

    ## ロック解除
    $lock->{$host}->up;

}

sub rand_md5 {
 
    my $max = 9999;
    my $min = 1000;
 
    my $rand = time() . int( rand($max - $min + 1) ) + $min;
    my $md5 = md5_hex($rand);
 
    return($md5);
 
}

これまで、manager スレッドは処理数だけで終了条件を判断していましたが、fetcher の状態もチェックしてみるようにしてみました。

もっと楽な方法があるかもですがとりあえずはこんな感じに。

注意点としては fetcher からまた別のスレッドへ処理を移した際に、そのスレッドが物凄く重かったりすると多分道連れで終了してしまうはず。これを書いた後に気付いたけど今のところはそのような処理もないのでまたこれで様子をみたいと思います。