work.log

エンジニアの備忘録的ブログ

CoroのLWPを使うスレッドを一定条件で強制終了させる

投稿:2014-05-13 19:07  更新:

Coro のメモ書き。

下記記事で作成した Coro を使ったクローラをベースに、まともなヤツを書いて動かしていたのですが早速問題が発生。

Coro の処理からメインスレッドに戻るのに、成功、失敗を含めた全処理数をチェックして完了ならメインスレッドへという事をしていましたが、何故かこの処理数が合わない現象が稀に発生してメインスレッドに戻れない現象が。

失敗例はこんな感じ。

Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait]
 
## 処理の成功可否が何処かでロスト
## 以降、永遠に終了条件に達しなくなる
 
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle
Task: 10 % Total: 10 Done: 1 Fail: 0
fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle

これはマズいという事で、下記のような強制終了機能を付け加えてみました。ちなみにこのコードはテストようなので絶対に強制終了します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/perl
 
use strict;
use warnings;
use Coro;
use Coro::Timer;
use Coro::Semaphore;
use Coro::LWP;
use URI;
use LWP::UserAgent;
use Digest::MD5 qw/md5_hex/;
use Try::Tiny;
 
    my $job = '10';
    my $lock  = {};
    my $queue = {};
    my @done  = ();
    my @fail  = ();
     
    my $debug = 1;
    my $host_sem = 1;
    my $timeout = 10;
    my $main = Coro::Signal->new;
 
    my $url = 'http://example.jp/';
 
    &worker( fetcher => \&fetcher, 10 );
    &worker( manager => \&manager, 1 );
 
    &queue("fetch")->put($url);
    $main->wait;
 
 
sub done { push(@done, $_[0]); }
sub fail { push(@fail, $_[0]); }
sub debug { if ($debug) { print $_[0] . "\n"; } }
 
sub queue {
    my $name = shift;
    $queue->{$name} ||= Coro::Channel->new;
}
 
sub worker {
 
    my ($name, $code, $num) = @_;
 
    for (1 .. $num) {
 
        my $desc = $name . "_" . &rand_md5();
 
        async {
 
            Coro::current->desc($desc);
 
            while ( 1 ) {
 
                try {
                    $code->();
                } catch {
                    warn $_;
                    &fail($_);
                };
            }
 
        };
 
    }
 
}
 
sub manager {
 
    Coro::Timer::sleep 1;
 
    my $done = scalar(@done);
    my $fail = scalar(@fail);
    my $active = 0;
 
    my $task = int(($done + $fail) / $job * 100);
    print "Task: $task % Total: $job Done: $done Fail: $fail\n";
 
    my @coro = Coro::State::list;
 
    foreach my $coro (@coro) {
 
        next if (! $coro->desc);
        next if ($coro->desc !~ /fetcher/);
        next if (! defined($coro->{alive}));
 
        ## ステータス情報を持つ fetcher がいればカウント
        $active++;
 
        ## wait フラグがない fetcher のタイムアウト処理
        if ($coro->{alive} <= 0 && ! $coro->{wait}) {
 
            ## 該当する fetcher をキャンセルして、新しく生成
            $coro->cancel();
            &worker( fetcher => \&fetcher, 1 );
 
            ## idle ステータス以外の場合は fail カウント
            if ($coro->{status} !~ /idle/) {
 
                &fail("timeout: ", $coro->desc);
                $lock->{$coro->{host}}->up;
 
            }
 
            &debug("timeout: " . $coro->desc, "\n");
 
        } else {
            &debug($coro->desc . " alive = $coro->{alive} cur = $coro->{status}");
 
            ## wait フラグがない fetcher の生存時間を減算
            if (! $coro->{wait}) { $coro->{alive} -= 1; }
        }
    }
 
    ## 全ての処理が完了したらメインスレッドへ戻る
    if ( ($done + $fail) >= $job) {
 
        print "completed: Done => $done Fail => $fail\n";
        $main->send;
 
    ## active な fetcher がいなければ強制でメインスレッドへ戻る
    } elsif (!$active) {
 
        &debug("notice: forced termination");
        print "completed: Done => $done Fail => $fail\n";
        $main->send;
 
    }
 
}
 
sub fetcher  {
 
    my $url = &queue("fetch")->get;
    my $cur = Coro::current;
    my $host = URI->new($url)->host;
    my $sem = $lock->{$host} ||= Coro::Semaphore->new($host_sem);
    my $lwp = LWP::UserAgent->new();
    my $res = 0;
 
    ## idel -> lock へ
    ## wait on
    $cur->{host} = $host;
    $cur->{status} = "$host [lock]";
    $cur->{alive} = $timeout;
    $cur->{wait} = 1;
 
    ## ロック処理。セマフォの上限じゃなければ次へ
    $lock->{$host}->down;
 
    ## lock -> run へ
    ## wait off
    $cur->{status} = "$host [run]";
    $cur->{wait} = 0;
 
    $res = $lwp->get($url);
 
    if ($res->is_success) {
        &done($url);
    } else {
        &fail($url);
    }
 
    ## run -> wait へ
    ## wait on
    $cur->{status} = "$host [wait]";
    $cur->{wait} = 1;
 
    ## 接続後のスリープ
    Coro::Timer::sleep 3;
 
    ## wait -> idle へ
    ## wait off
    $cur->{status} = 'idle';
    $cur->{alive} = $timeout;
    $cur->{wait} = 0;
    undef $cur->{host};
 
    ## ロック解除
    $lock->{$host}->up;
 
}
 
sub rand_md5 {
  
    my $max = 9999;
    my $min = 1000;
  
    my $rand = time() . int( rand($max - $min + 1) ) + $min;
    my $md5 = md5_hex($rand);
  
    return($md5);
  
}

これまで、manager スレッドは処理数だけで終了条件を判断していましたが、fetcher の状態もチェックしてみるようにしてみました。

もっと楽な方法があるかもですがとりあえずはこんな感じに。

注意点としては fetcher からまた別のスレッドへ処理を移した際に、そのスレッドが物凄く重かったりすると多分道連れで終了してしまうはず。これを書いた後に気付いたけど今のところはそのような処理もないのでまたこれで様子をみたいと思います。

スポンサーリンク

コメント

コメントを残す

よく読まれている記事

  • 今日
  • 週間
  • 月間