File Coverage

blib/lib/Bot/Cobalt/Plugin/Calc/Session.pm
Criterion Covered Total %
statement 96 117 82.0
branch 8 20 40.0
condition 6 14 42.8
subroutine 22 23 95.6
pod 0 13 0.0
total 132 187 70.5


line stmt bran cond sub pod time code
1             package Bot::Cobalt::Plugin::Calc::Session;
2             $Bot::Cobalt::Plugin::Calc::Session::VERSION = '0.004005';
3 2     2   14173 use v5.10;
  2         4  
4              
5 2     2   7 use Config;
  2         2  
  2         54  
6              
7 2     2   5 use Carp;
  2         2  
  2         98  
8 2     2   6 use strictures 2;
  2         9  
  2         55  
9              
10 2     2   1086 use Time::HiRes ();
  2         1884  
  2         55  
11              
12 2     2   854 use POE 'Wheel::Run', 'Filter::Reference';
  2         55759  
  2         11  
13              
14             sub TIMEOUT () { 0 }
15             sub SESSID () { 1 }
16             sub WHEELS () { 2 }
17             sub REQUESTS () { 3 }
18             sub TAG_BY_WID () { 4 }
19             sub PENDING () { 5 }
20             sub MAX_WORKERS () { 6 }
21             sub RESULT_EVENT () { 7 }
22             sub ERROR_EVENT () { 8 }
23              
24             sub new {
25 2     2 0 29 my ($class, %params) = @_;
26              
27             # note the Worker proc also has a RLIMIT_CPU in place (where supported):
28 2   50     10 my $timeout = $params{timeout} || 2;
29 2   50     9 my $maxwrk = $params{max_workers} || 4;
30              
31 2   50     10 my $result_event = $params{result_event} || 'calc_result';
32 2   50     11 my $error_event = $params{error_event} || 'calc_error';
33            
34 2         9 bless [
35             $timeout, # TIMEOUT
36             undef, # SESSID
37             +{}, # WHEELS
38             +{}, # REQUESTS
39             +{}, # TAG_BY_WID
40             [], # PENDING
41             $maxwrk, # MAX_WORKERS
42             $result_event, # RESULT_EVENT
43             $error_event, # ERROR_EVENT
44             ], $class
45             }
46              
47 2     2 0 1825 sub session_id { shift->[SESSID] }
48              
49 1     1   2479 sub _wheels { shift->[WHEELS] }
50 1     1   5 sub _tag_by_wid { shift->[TAG_BY_WID] }
51 1     1   4 sub _requests { shift->[REQUESTS] }
52              
53             sub start {
54 1     1 0 227 my $self = shift;
55              
56 1         17 my $sess = POE::Session->create(
57             object_states => [
58             $self => +{
59             _start => 'px_start',
60             shutdown => 'px_shutdown',
61             cleanup => 'px_cleanup',
62              
63             calc => 'px_calc',
64             push => 'px_push',
65            
66             worker_timeout => 'px_worker_timeout',
67             worker_input => 'px_worker_input',
68             worker_stderr => 'px_worker_stderr',
69             worker_sigchld => 'px_worker_sigchld',
70             worker_closed => 'px_worker_closed',
71             },
72             ],
73             );
74              
75 1         132 $self->[SESSID] = $sess->ID;
76              
77 1         3 $self
78             }
79              
80              
81             sub px_start {
82 1     1 0 208 my ($kernel, $self) = @_[KERNEL, OBJECT];
83 1         3 $kernel->refcount_increment( $_[SESSION]->ID, 'Waiting for requests' );
84             }
85              
86             sub px_shutdown {
87 1     1 0 131 my ($kernel, $self) = @_[KERNEL, OBJECT];
88 1         4 $kernel->call( $_[SESSION], 'cleanup' );
89 1         8 $kernel->refcount_decrement( $_[SESSION]->ID, 'Waiting for requests' );
90             }
91              
92             sub px_cleanup {
93 1     1 0 37 my ($kernel, $self) = @_[KERNEL, OBJECT];
94 1         3 for my $pid (keys %{ $self->[WHEELS] }) {
  1         5  
95 1 50       6 if (my $wheel = delete $self->[WHEELS]->{$pid}) {
96 1         8 $wheel->kill('TERM')
97             }
98             }
99 1         28 $self->[TAG_BY_WID] = +{};
100 1         5 $self->[PENDING] = [];
101             }
102              
103             sub px_calc {
104             # calc => $expr, $hints
105 1     1 0 225 my ($kernel, $self) = @_[KERNEL, OBJECT];
106 1         2 my ($expr, $hints) = @_[ARG0, ARG1];
107 1         3 my $sender_id = $_[SENDER]->ID;
108              
109 1 50       5 unless (defined $expr) {
110 0         0 warn "'calc' event expected an EXPR and optional hints scalar";
111 0   0     0 $kernel->post( $sender_id => $self->[ERROR_EVENT] =>
112             "EXPR not defined",
113             $hints // +{}
114             );
115             }
116              
117 1         5 state $p = [ 'a' .. 'z', 1 .. 9 ];
118 1         1 my $tag = join '', map {; $p->[rand @$p] } 1 .. 3;
  3         43  
119 1         16 $tag .= $p->[rand @$p] while exists $self->[REQUESTS]->{$tag};
120              
121 1   50     6 my $pending = +{
122             expr => $expr,
123             tag => $tag,
124             hints => ($hints // +{}),
125             sender_id => $sender_id,
126             };
127              
128 1         1 $self->[REQUESTS]->{$tag} = $pending;
129 1         4 push @{ $self->[PENDING] }, $pending;
  1         3  
130 1         4 $kernel->yield('push');
131             }
132              
133             sub px_push {
134 1     1 0 152 my ($kernel, $self) = @_[KERNEL, OBJECT];
135              
136 1 50       1 return unless @{ $self->[PENDING] };
  1         4  
137              
138 1 50       1 if (keys %{ $self->[WHEELS] } >= $self->[MAX_WORKERS]) {
  1         4  
139 0         0 $kernel->delay( push => 0.5 );
140             return
141 0         0 }
142              
143 1         2 my $wheel = $self->_create_wheel;
144              
145 1         1 my $next = shift @{ $self->[PENDING] };
  1         3  
146 1         2 my $tag = $next->{tag};
147 1         7 $self->[TAG_BY_WID]->{ $wheel->ID } = $tag;
148              
149 1         8 $kernel->delay( worker_timeout => $self->[TIMEOUT], $wheel );
150 1         82 $wheel->put( [ $next->{tag}, $next->{expr} ] );
151             }
152              
153             sub _create_wheel {
154 1     1   1 my ($self) = @_;
155            
156 1         68 my $ppath = $Config{perlpath};
157 1 50       5 if ($^O ne 'VMS') {
158 1 50       30 $ppath .= $Config{_exe} unless $ppath =~ m/$Config{_exe}$/i;
159             }
160            
161 1         2 my $forkable;
162 1 50       17 if ($^O eq 'MSWin32') {
163 0         0 require Bot::Cobalt::Plugin::Calc::Worker;
164 0         0 $forkable = \&Bot::Cobalt::Plugin::Calc::Worker::worker
165             } else {
166             $forkable = [
167             $ppath,
168 1         3 (map {; '-I'.$_ } @INC),
  11         16  
169             '-MBot::Cobalt::Plugin::Calc::Worker',
170             '-e',
171             'Bot::Cobalt::Plugin::Calc::Worker->worker'
172             ]
173             }
174              
175 1         10 my $wheel = POE::Wheel::Run->new(
176             Program => $forkable,
177             StdioFilter => POE::Filter::Reference->new,
178             StderrEvent => 'worker_stderr',
179             StdoutEvent => 'worker_input',
180             CloseEvent => 'worker_closed',
181             );
182              
183 1         2573 my $pid = $wheel->PID;
184 1         9 $poe_kernel->sig_child($pid, 'worker_sigchld');
185 1         92 $self->[WHEELS]->{$pid} = $wheel;
186              
187 1         6 $wheel
188             }
189              
190             sub px_worker_timeout {
191 1     1 0 1952999 my ($kernel, $self) = @_[KERNEL, OBJECT];
192 1         4 my $wheel = $_[ARG0];
193 1         7 $wheel->kill('INT');
194             }
195              
196             sub px_worker_input {
197 1     1 0 46359 my ($kernel, $self) = @_[KERNEL, OBJECT];
198            
199 1         3 my ($input, $wid) = @_[ARG0, ARG1];
200 1         3 my ($tag, $result) = @$input;
201            
202 1         4 my $req = delete $self->[REQUESTS]->{$tag};
203 1 50       5 unless ($req) {
204 0         0 warn "BUG? worker input but no request found for tag '$tag'";
205             return
206 0         0 }
207            
208             $kernel->post( $req->{sender_id} => $self->[RESULT_EVENT] =>
209             $result, $req->{hints}
210             )
211 1         12 }
212              
213             sub px_worker_stderr {
214 0     0 0 0 my ($kernel, $self) = @_[KERNEL, OBJECT];
215 0         0 my ($input, $wid) = @_[ARG0, ARG1];
216 0         0 my $tag = $self->[TAG_BY_WID]->{$wid};
217 0 0       0 unless (defined $tag) {
218 0         0 warn
219             "BUG? px_worker_stderr but no tag for wheel ID '$wid': '$input'";
220             }
221 0         0 my $req = delete $self->[REQUESTS]->{$tag};
222 0 0       0 if (defined $req) {
223 0         0 my $sender_id = $req->{sender_id};
224 0         0 my $hints = $req->{hints};
225 0         0 $kernel->post( $req->{sender_id} => $self->[ERROR_EVENT] =>
226             "worker '$wid': $input", $hints
227             )
228             } else {
229 0         0 warn "stderr from worker but request unavailable: '$input'"
230             }
231             }
232              
233             sub px_worker_closed {
234 1     1 0 431 my ($kernel, $self) = @_[KERNEL, OBJECT];
235 1         1 my $wid = $_[ARG0];
236 1         3 delete $self->[TAG_BY_WID]->{$wid};
237             }
238              
239             sub px_worker_sigchld {
240 1     1 0 532 my ($kernel, $self) = @_[KERNEL, OBJECT];
241 1         3 my $pid = $_[ARG1];
242 1   50     7 my $wheel = delete $self->[WHEELS]->{$pid} || return;
243 0           delete $self->[TAG_BY_WID]->{ $wheel->ID };
244 0           $kernel->yield('push')
245             }
246              
247              
248             1;