File Coverage

blib/lib/Bot/Cobalt/Plugin/Calc/Session.pm
Criterion Covered Total %
statement 96 117 82.0
branch 8 20 40.0
condition 6 14 42.8
subroutine 22 23 95.6
pod 0 13 0.0
total 132 187 70.5


line stmt bran cond sub pod time code
1             package Bot::Cobalt::Plugin::Calc::Session;
2             $Bot::Cobalt::Plugin::Calc::Session::VERSION = '0.004004';
3 2     2   14834 use v5.10;
  2         4  
4              
5 2     2   6 use Config;
  2         2  
  2         52  
6              
7 2     2   6 use Carp;
  2         2  
  2         96  
8 2     2   6 use strictures 2;
  2         9  
  2         56  
9              
10 2     2   1158 use Time::HiRes ();
  2         1957  
  2         58  
11              
12 2     2   864 use POE 'Wheel::Run', 'Filter::Reference';
  2         56035  
  2         9  
13              
14             sub TIMEOUT () { 0 }
15             sub SESSID () { 1 }
16             sub WHEELS () { 2 }
17             sub REQUESTS () { 3 }
18             sub TAG_BY_WID () { 4 }
19             sub PENDING () { 5 }
20             sub MAX_WORKERS () { 6 }
21             sub RESULT_EVENT () { 7 }
22             sub ERROR_EVENT () { 8 }
23              
24             sub new {
25 2     2 0 29 my ($class, %params) = @_;
26              
27             # note the Worker proc also has a RLIMIT_CPU in place (where supported):
28 2   50     10 my $timeout = $params{timeout} || 2;
29 2   50     8 my $maxwrk = $params{max_workers} || 4;
30              
31 2   50     9 my $result_event = $params{result_event} || 'calc_result';
32 2   50     8 my $error_event = $params{error_event} || 'calc_error';
33            
34 2         10 bless [
35             $timeout, # TIMEOUT
36             undef, # SESSID
37             +{}, # WHEELS
38             +{}, # REQUESTS
39             +{}, # TAG_BY_WID
40             [], # PENDING
41             $maxwrk, # MAX_WORKERS
42             $result_event, # RESULT_EVENT
43             $error_event, # ERROR_EVENT
44             ], $class
45             }
46              
47 2     2 0 1785 sub session_id { shift->[SESSID] }
48              
49 1     1   2528 sub _wheels { shift->[WHEELS] }
50 1     1   4 sub _tag_by_wid { shift->[TAG_BY_WID] }
51 1     1   5 sub _requests { shift->[REQUESTS] }
52              
53             sub start {
54 1     1 0 204 my $self = shift;
55              
56 1         13 my $sess = POE::Session->create(
57             object_states => [
58             $self => +{
59             _start => 'px_start',
60             shutdown => 'px_shutdown',
61             cleanup => 'px_cleanup',
62              
63             calc => 'px_calc',
64             push => 'px_push',
65            
66             worker_timeout => 'px_worker_timeout',
67             worker_input => 'px_worker_input',
68             worker_stderr => 'px_worker_stderr',
69             worker_sigchld => 'px_worker_sigchld',
70             worker_closed => 'px_worker_closed',
71             },
72             ],
73             );
74              
75 1         125 $self->[SESSID] = $sess->ID;
76              
77 1         3 $self
78             }
79              
80              
81             sub px_start {
82 1     1 0 168 my ($kernel, $self) = @_[KERNEL, OBJECT];
83 1         3 $kernel->refcount_increment( $_[SESSION]->ID, 'Waiting for requests' );
84             }
85              
86             sub px_shutdown {
87 1     1 0 120 my ($kernel, $self) = @_[KERNEL, OBJECT];
88 1         3 $kernel->call( $_[SESSION], 'cleanup' );
89 1         7 $kernel->refcount_decrement( $_[SESSION]->ID, 'Waiting for requests' );
90             }
91              
92             sub px_cleanup {
93 1     1 0 22 my ($kernel, $self) = @_[KERNEL, OBJECT];
94 1         1 for my $pid (keys %{ $self->[WHEELS] }) {
  1         4  
95 1 50       5 if (my $wheel = delete $self->[WHEELS]->{$pid}) {
96 1         5 $wheel->kill('TERM')
97             }
98             }
99 1         21 $self->[TAG_BY_WID] = +{};
100 1         3 $self->[PENDING] = [];
101             }
102              
103             sub px_calc {
104             # calc => $expr, $hints
105 1     1 0 205 my ($kernel, $self) = @_[KERNEL, OBJECT];
106 1         2 my ($expr, $hints) = @_[ARG0, ARG1];
107 1         2 my $sender_id = $_[SENDER]->ID;
108              
109 1 50       5 unless (defined $expr) {
110 0         0 warn "'calc' event expected an EXPR and optional hints scalar";
111 0   0     0 $kernel->post( $sender_id => $self->[ERROR_EVENT] =>
112             "EXPR not defined",
113             $hints // +{}
114             );
115             }
116              
117 1         9 state $p = [ 'a' .. 'z', 1 .. 9 ];
118 1         2 my $tag = join '', map {; $p->[rand @$p] } 1 .. 3;
  3         31  
119 1         4 $tag .= $p->[rand @$p] while exists $self->[REQUESTS]->{$tag};
120              
121 1   50     6 my $pending = +{
122             expr => $expr,
123             tag => $tag,
124             hints => ($hints // +{}),
125             sender_id => $sender_id,
126             };
127              
128 1         1 $self->[REQUESTS]->{$tag} = $pending;
129 1         1 push @{ $self->[PENDING] }, $pending;
  1         2  
130 1         7 $kernel->yield('push');
131             }
132              
133             sub px_push {
134 1     1 0 149 my ($kernel, $self) = @_[KERNEL, OBJECT];
135              
136 1 50       1 return unless @{ $self->[PENDING] };
  1         3  
137              
138 1 50       1 if (keys %{ $self->[WHEELS] } >= $self->[MAX_WORKERS]) {
  1         4  
139 0         0 $kernel->delay( push => 0.5 );
140             return
141 0         0 }
142              
143 1         3 my $wheel = $self->_create_wheel;
144              
145 1         2 my $next = shift @{ $self->[PENDING] };
  1         3  
146 1         2 my $tag = $next->{tag};
147 1         10 $self->[TAG_BY_WID]->{ $wheel->ID } = $tag;
148              
149 1         9 $kernel->delay( worker_timeout => $self->[TIMEOUT], $wheel );
150 1         78 $wheel->put( [ $next->{tag}, $next->{expr} ] );
151             }
152              
153             sub _create_wheel {
154 1     1   4 my ($self) = @_;
155            
156 1         56 my $ppath = $Config{perlpath};
157 1 50       5 if ($^O ne 'VMS') {
158 1 50       22 $ppath .= $Config{_exe} unless $ppath =~ m/$Config{_exe}$/i;
159             }
160            
161 1         1 my $forkable;
162 1 50       15 if ($^O eq 'MSWin32') {
163 0         0 require Bot::Cobalt::Plugin::Calc::Worker;
164 0         0 $forkable = \&Bot::Cobalt::Plugin::Calc::Worker::worker
165             } else {
166             $forkable = [
167             $ppath,
168 1         2 (map {; '-I'.$_ } @INC),
  11         14  
169             '-MBot::Cobalt::Plugin::Calc::Worker',
170             '-e',
171             'Bot::Cobalt::Plugin::Calc::Worker->worker'
172             ]
173             }
174              
175 1         8 my $wheel = POE::Wheel::Run->new(
176             Program => $forkable,
177             StdioFilter => POE::Filter::Reference->new,
178             StderrEvent => 'worker_stderr',
179             StdoutEvent => 'worker_input',
180             CloseEvent => 'worker_closed',
181             );
182              
183 1         2480 my $pid = $wheel->PID;
184 1         9 $poe_kernel->sig_child($pid, 'worker_sigchld');
185 1         93 $self->[WHEELS]->{$pid} = $wheel;
186              
187 1         5 $wheel
188             }
189              
190             sub px_worker_timeout {
191 1     1 0 1958876 my ($kernel, $self) = @_[KERNEL, OBJECT];
192 1         2 my $wheel = $_[ARG0];
193 1         8 $wheel->kill('INT');
194             }
195              
196             sub px_worker_input {
197 1     1 0 40587 my ($kernel, $self) = @_[KERNEL, OBJECT];
198            
199 1         3 my ($input, $wid) = @_[ARG0, ARG1];
200 1         1 my ($tag, $result) = @$input;
201            
202 1         4 my $req = delete $self->[REQUESTS]->{$tag};
203 1 50       5 unless ($req) {
204 0         0 warn "BUG? worker input but no request found for tag '$tag'";
205             return
206 0         0 }
207            
208             $kernel->post( $req->{sender_id} => $self->[RESULT_EVENT] =>
209             $result, $req->{hints}
210             )
211 1         14 }
212              
213             sub px_worker_stderr {
214 0     0 0 0 my ($kernel, $self) = @_[KERNEL, OBJECT];
215 0         0 my ($input, $wid) = @_[ARG0, ARG1];
216 0         0 my $tag = $self->[TAG_BY_WID]->{$wid};
217 0 0       0 unless (defined $tag) {
218 0         0 warn "BUG? px_worker_stderr but no tag for wheel ID '$wid'";
219             }
220 0         0 my $req = delete $self->[REQUESTS]->{$tag};
221 0 0       0 if (defined $req) {
222 0         0 my $sender_id = $req->{sender_id};
223 0         0 my $hints = $req->{hints};
224 0         0 $kernel->post( $req->{sender_id} => $self->[ERROR_EVENT] =>
225             "worker '$wid': $input", $hints
226             )
227             } else {
228 0         0 warn "stderr from worker but request unavailable: $input"
229             }
230             }
231              
232             sub px_worker_closed {
233 1     1 0 514 my ($kernel, $self) = @_[KERNEL, OBJECT];
234 1         3 my $wid = $_[ARG0];
235 1         2 delete $self->[TAG_BY_WID]->{$wid};
236             }
237              
238             sub px_worker_sigchld {
239 1     1 0 398 my ($kernel, $self) = @_[KERNEL, OBJECT];
240 1         2 my $pid = $_[ARG1];
241 1   50     4 my $wheel = delete $self->[WHEELS]->{$pid} || return;
242 0           delete $self->[TAG_BY_WID]->{ $wheel->ID };
243 0           $kernel->yield('push')
244             }
245              
246              
247             1;