Coro のメモ書き。
下記記事で作成した Coro を使ったクローラをベースに、まともなヤツを書いて動かしていたのですが早速問題が発生。
Coro の処理からメインスレッドに戻るのに、成功、失敗を含めた全処理数をチェックして完了ならメインスレッドへという事をしていましたが、何故かこの処理数が合わない現象が稀に発生してメインスレッドに戻れない現象が。
失敗例はこんな感じ。
Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait] Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait] Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = example.jp [wait] ## 処理の成功可否が何処かでロスト ## 以降、永遠に終了条件に達しなくなる Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle Task: 10 % Total: 10 Done: 1 Fail: 0 fetcher_e1a07b2e0581d01f8b1020e5051e83dd alive = 10 cur = idle ・ ・ ・ |
これはマズいという事で、下記のような強制終了機能を付け加えてみました。ちなみにこのコードはテストようなので絶対に強制終了します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | #!/usr/bin/perl use strict; use warnings; use Coro; use Coro::Timer; use Coro::Semaphore; use Coro::LWP; use URI; use LWP::UserAgent; use Digest::MD5 qw/md5_hex/ ; use Try::Tiny; my $job = '10' ; my $lock = {}; my $queue = {}; my @done = (); my @fail = (); my $debug = 1; my $host_sem = 1; my $timeout = 10; my $main = Coro::Signal->new; my $url = 'http://example.jp/' ; &worker ( fetcher => \ &fetcher , 10 ); &worker ( manager => \ &manager , 1 ); &queue ( "fetch" )->put( $url ); $main -> wait ; sub done { push ( @done , $_ [0]); } sub fail { push ( @fail , $_ [0]); } sub debug { if ( $debug ) { print $_ [0] . "\n" ; } } sub queue { my $name = shift ; $queue ->{ $name } ||= Coro::Channel->new; } sub worker { my ( $name , $code , $num ) = @_ ; for (1 .. $num ) { my $desc = $name . "_" . &rand_md5 (); async { Coro::current->desc( $desc ); while ( 1 ) { try { $code ->(); } catch { warn $_ ; &fail ( $_ ); }; } }; } } sub manager { Coro::Timer:: sleep 1; my $done = scalar ( @done ); my $fail = scalar ( @fail ); my $active = 0; my $task = int (( $done + $fail ) / $job * 100); print "Task: $task % Total: $job Done: $done Fail: $fail\n" ; my @coro = Coro::State::list; foreach my $coro ( @coro ) { next if (! $coro ->desc); next if ( $coro ->desc !~ /fetcher/); next if (! defined ( $coro ->{alive})); ## ステータス情報を持つ fetcher がいればカウント $active ++; ## wait フラグがない fetcher のタイムアウト処理 if ( $coro ->{alive} <= 0 && ! $coro ->{ wait }) { ## 該当する fetcher をキャンセルして、新しく生成 $coro ->cancel(); &worker ( fetcher => \ &fetcher , 1 ); ## idle ステータス以外の場合は fail カウント if ( $coro ->{status} !~ /idle/) { &fail ( "timeout: " , $coro ->desc); $lock ->{ $coro ->{host}}->up; } &debug ( "timeout: " . $coro ->desc, "\n" ); } else { &debug ( $coro ->desc . " alive = $coro->{alive} cur = $coro->{status}" ); ## wait フラグがない fetcher の生存時間を減算 if (! $coro ->{ wait }) { $coro ->{alive} -= 1; } } } ## 全ての処理が完了したらメインスレッドへ戻る if ( ( $done + $fail ) >= $job ) { print "completed: Done => $done Fail => $fail\n" ; $main -> send ; ## active な fetcher がいなければ強制でメインスレッドへ戻る } elsif (! $active ) { &debug ( "notice: forced termination" ); print "completed: Done => $done Fail => $fail\n" ; $main -> send ; } } sub fetcher { my $url = &queue ( "fetch" )->get; my $cur = Coro::current; my $host = URI->new( $url )->host; my $sem = $lock ->{ $host } ||= Coro::Semaphore->new( $host_sem ); my $lwp = LWP::UserAgent->new(); my $res = 0; ## idel -> lock へ ## wait on $cur ->{host} = $host ; $cur ->{status} = "$host [lock]" ; $cur ->{alive} = $timeout ; $cur ->{ wait } = 1; ## ロック処理。セマフォの上限じゃなければ次へ $lock ->{ $host }->down; ## lock -> run へ ## wait off $cur ->{status} = "$host [run]" ; $cur ->{ wait } = 0; $res = $lwp ->get( $url ); if ( $res ->is_success) { &done ( $url ); } else { &fail ( $url ); } ## run -> wait へ ## wait on $cur ->{status} = "$host [wait]" ; $cur ->{ wait } = 1; ## 接続後のスリープ Coro::Timer:: sleep 3; ## wait -> idle へ ## wait off $cur ->{status} = 'idle' ; $cur ->{alive} = $timeout ; $cur ->{ wait } = 0; undef $cur ->{host}; ## ロック解除 $lock ->{ $host }->up; } sub rand_md5 { my $max = 9999; my $min = 1000; my $rand = time () . int ( rand ( $max - $min + 1) ) + $min ; my $md5 = md5_hex( $rand ); return ( $md5 ); } |
これまで、manager スレッドは処理数だけで終了条件を判断していましたが、fetcher の状態もチェックしてみるようにしてみました。
もっと楽な方法があるかもですがとりあえずはこんな感じに。
注意点としては fetcher からまた別のスレッドへ処理を移した際に、そのスレッドが物凄く重かったりすると多分道連れで終了してしまうはず。これを書いた後に気付いたけど今のところはそのような処理もないのでまたこれで様子をみたいと思います。