work.log

元エンジニアの備忘録的ブログ

Coroで作成したスレッドの実行順序を制御する

投稿:2014-04-18 18:46  更新:

Coro のメモ書き。

下記の記事でセマフォによる同時スレッド数の制御と、簡単な HTML パーサ機能を付け加えましたがこれだけでは全然使い物にならないので、新しく別の処理を追加してみたいと思います。

特に、並行化する必要のない処理であればメインスレッドに戻って来たあとに必要なコードをかけばそれでお終いですが、今回は並行化する処理を 2 つ用意して順に処理されるようにしてみました。

前回までは、schedule でメインスレッドから処理を移していましたが、今回は Coro::Signal を使ってそれぞれのタスクを制御しながら実行してみます。

それぞれのタスクは下記のようなものです。

タスク 1

  1. url リストより feed 情報を取得
  2. feed の解析処理
  3. 解析したデータよりエントリー url を抜き出す

タスク 2

  1. タスク 1 で取得した url リストからコンテンツを取得
  2. コンテンツの解析処理
  3. 解析したデータからタイトルを抜き出す

では、早速コード。

#!/usr/bin/perl

use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::Semaphore;
use Coro::LWP;
use LWP::UserAgent;
use URI;
use XML::FeedPP;
use HTML::TreeBuilder;
use Digest::MD5 qw/md5_hex/;
use List::Util;

    my $job   = '';
    my $lock  = {};
    my $queue = {};
    my @url   = ();
    my @done  = ();
    my @fail  = ();

    my $timeout = 15;
    my $main = Coro::Signal->new;

    my @feed = (
           'https://worklog.be/feed',
           'http://example.jp/feed',
           'http://example.com/feed',
       );

    ## Task 1
    ## manager, fetcher, feed 用の parser を生成
    worker( fetcher => \&fetcher, 5 );
    worker( parser => \&feed_parser, 1 );
    manager();

    ## fetcher に feed url を送りつける
    queue("fetch")->put($_) for @feed;

    ## job 数のセット
    $job = scalar(@feed);

    print "# --------------------\n";
    print "# Task 1\n";
    print "# --------------------\n";

    ## メインスレッドを wait して切り替え
    $main->wait;

    ## Task 1 が完了後ここに戻る

    print "\n";
    print "# --------------------\n";
    print "# Task 2\n";
    print "# --------------------\n";

    ## Task 2
    @done = (); 
    @fail = (); 
    @url = List::Util::shuffle @url;

    ## html 用の parser 生成
    ## manager, fetcher は眠っているだけなので起こして再利用
    worker( parser => \&parser, 1 );

    queue("fetch")->put($_) for @url;
    $job = scalar(@url);

    ## メインスレッドを wait
    $main->wait;

    ## タスク完了後に取得したタイトルを表示
    foreach (@done) {
        print "$_\n";
    }

sub done {
    push(@done, $_[0]);
}

sub fail {
    push(@fail, $_[0]);
}

sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}

sub worker {

    my ($name, $code, $num) = @_;

    for (1 .. $num) {

        my $desc = $name . "_" . rand_md5();

        async {

            Coro::current->desc($desc);

            while ( 1 ) {
                $code->();
            }

        };

    }

}

sub manager {

    async {

        while ( 1 ) {

            Coro::Timer::sleep 1;

            my $done = scalar(@done);
            my $fail = scalar(@fail);
            my $task = int(($done + $fail) / $job * 100);

            print "Task: $task % Total: $job Done: $done Fail: $fail\n";

            ## fetcher のタイムアウト処理
            my @fetcher =  Coro::State::list;

            foreach my $fetcher (@fetcher) {

                next if (!$fetcher->desc);
                next if ($fetcher->desc !~ /fetcher/);
                next if (! defined($fetcher->{alive}));

                if ($fetcher->{alive} <= 0 && ! $fetcher->{idle}) {

                    print "timeout: ", $fetcher->desc, "\n";
                    $fetcher->cancel();
                    fail("timeout: ", $fetcher->desc);

                    $lock->{$fetcher->{host}}->up;

                    worker( fetcher => \&fetcher, 1 );

                } else {

                    print "alive: ", $fetcher->desc, " keep alive = $fetcher->{alive} cur = $fetcher->{status}\n";

                    if (! $fetcher->{idle}) { $fetcher->{alive} -= 1; }

                }


            }

            ## 全タスク完了後の処理
            if ( ($done + $fail) >= $job) {

                print "completed: Done => $done Fail => $fail\n";
                Coro::Timer::sleep 5;

                ## parser のスレッドは使いまわせないので全てキャンセル
                my @coro = Coro::State::list;
                foreach my $coro (@coro) {
                    next if (!$coro->desc);
                    next if ($coro->desc !~ /parser/);
                    $coro->cancel();
                }

                ## メインスレッドへ戻る
                $main->send;

            }

        }
    };

}

sub fetcher  {

    my $url = queue("fetch")->get;
    my $host = URI->new($url)->host;
    my $sem = $lock->{$host} ||= Coro::Semaphore->new(2);
    my $lwp = '';
    my $res = 0;

    $Coro::current->{host} = $host;
    $Coro::current->{status} = "$host [lock]";

    $lock->{$host}->down;

    $Coro::current->{status} = "$host [run]"; 
    $Coro::current->{idle} = 0;
    $Coro::current->{alive} = $timeout;

    $lwp = LWP::UserAgent->new();
    $res = $lwp->get($url);

    if ($res->is_success) {
        queue("parser")->put($res->content);
    } else {
        fail("fail:$url");
    }

    $Coro::current->{status} = "$host [wait]"; 
    $Coro::current->{idle} = 1;

    Coro::Timer::sleep 3;

    $Coro::current->{host} = 'none';
    $Coro::current->{status} = 'idle'; 
    $Coro::current->{alive} = $timeout;

    $lock->{$host}->up;

}

## html 用の解析処理
sub parser {

    my $content = queue("parser")->get;

    my $tree = HTML::TreeBuilder->new;
    $tree->parse($content);
    $tree->eof();

    if ($tree->find('title')) {

        foreach ($tree->find('title')) {

            done($_->as_text);
            last;

        }

    } else {

        fail("notitle");

    }

}

## feed 用の解析処理
sub feed_parser {

    my $content = queue("parser")->get;

    my $feed = XML::FeedPP->new($content);

    foreach ($feed->get_item()) {
        push(@url, $_->link());
    }

    done('OK');

}

sub rand_md5 {

    my $max = 9999;
    my $min = 1000;

    my $rand = time() . int( rand($max - $min + 1) ) + $min;
    my $md5 = md5_hex($rand);

    return($md5);

}

結果。

# --------------------
# Task 1
# --------------------
Task: 100 % Total: 3 Done: 3 Fail: 0
alive: fetcher_abf438a5a3d8338750ff1489a2238acc keep alive = 15 cur = worklog.be [wait]
alive: fetcher_1710fee1d8dbbaf9ee8e6bcf437aab79 keep alive = 15 cur = example.jp [wait]
alive: fetcher_fbd7bedb9272198b9807729e4d95deaa keep alive = 15 cur = example.com [wait]

completed: Done => 3 Fail => 0

# --------------------
# Task 2
# --------------------
Task: 10 % Total: 30 Done: 4 Fail: 0
alive: fetcher_cd08a3c5dafe5a50cb2ead5d908be621 keep alive = 15 cur = example.com [wait]
alive: fetcher_4d1149929ba36b875dda039ee8d003f5 keep alive = 15 cur = example.jp [wait]
alive: fetcher_abf438a5a3d8338750ff1489a2238acc keep alive = 15 cur = example.com [wait]
alive: fetcher_1710fee1d8dbbaf9ee8e6bcf437aab79 keep alive = 15 cur = example.com [lock]
alive: fetcher_fbd7bedb9272198b9807729e4d95deaa keep alive = 15 cur = worklog.be [wait]

Task: 10 % Total: 30 Done: 4 Fail: 0
alive: fetcher_cd08a3c5dafe5a50cb2ead5d908be621 keep alive = 15 cur = example.com [wait]
alive: fetcher_4d1149929ba36b875dda039ee8d003f5 keep alive = 15 cur = example.jp [wait]
alive: fetcher_abf438a5a3d8338750ff1489a2238acc keep alive = 15 cur = example.com [wait]
alive: fetcher_1710fee1d8dbbaf9ee8e6bcf437aab79 keep alive = 15 cur = example.com [lock]
alive: fetcher_fbd7bedb9272198b9807729e4d95deaa keep alive = 15 cur = worklog.be [wait]
・
・
・
completed: Done => 30 Fail => 0

? 以下取得できたタイトル ?

Coro::Signal の wait => send の方が見た目的にも明確なのでいいかもしれない。

Coro の実行制御はこれで以上です。

おすすめのVPSサーバ

  • OSが選べる
  • VPS同士でLANが組める
  • 複数台構成向き

このブログで使っています。

  • 転送量が多いサービスに
  • 借りてるのは3年間一度もdown無し!

よく見られている記事

  • 本日
  • 週間
  • 月間