File Coverage

blib/lib/Bot/Cobalt/Plugin/Calc/Session.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Bot::Cobalt::Plugin::Calc::Session;
2             $Bot::Cobalt::Plugin::Calc::Session::VERSION = '0.004003';
3 1     1   15815 use v5.10;
  1         2  
  1         30  
4              
5 1     1   3 use Config;
  1         2  
  1         30  
6              
7 1     1   3 use Carp;
  1         1  
  1         68  
8 1     1   3 use strictures 2;
  1         4  
  1         26  
9              
10 1     1   645 use Time::HiRes ();
  1         1334  
  1         28  
11              
12 1     1   871 use POE 'Wheel::Run', 'Filter::Reference';
  0            
  0            
13              
14             sub TIMEOUT () { 0 }
15             sub SESSID () { 1 }
16             sub WHEELS () { 2 }
17             sub REQUESTS () { 3 }
18             sub TAG_BY_WID () { 4 }
19             sub PENDING () { 5 }
20             sub MAX_WORKERS () { 6 }
21             sub RESULT_EVENT () { 7 }
22             sub ERROR_EVENT () { 8 }
23              
24             sub new {
25             my ($class, %params) = @_;
26              
27             # note the Worker proc also has a RLIMIT_CPU in place (where supported):
28             my $timeout = $params{timeout} || 2;
29             my $maxwrk = $params{max_workers} || 4;
30              
31             my $result_event = $params{result_event} || 'calc_result';
32             my $error_event = $params{error_event} || 'calc_error';
33            
34             bless [
35             $timeout, # TIMEOUT
36             undef, # SESSID
37             +{}, # WHEELS
38             +{}, # REQUESTS
39             +{}, # TAG_BY_WID
40             [], # PENDING
41             $maxwrk, # MAX_WORKERS
42             $result_event, # RESULT_EVENT
43             $error_event, # ERROR_EVENT
44             ], $class
45             }
46              
47             sub session_id { shift->[SESSID] }
48              
49             sub _wheels { shift->[WHEELS] }
50             sub _tag_by_wid { shift->[TAG_BY_WID] }
51             sub _requests { shift->[REQUESTS] }
52              
53             sub start {
54             my $self = shift;
55              
56             my $sess = POE::Session->create(
57             object_states => [
58             $self => +{
59             _start => 'px_start',
60             shutdown => 'px_shutdown',
61             cleanup => 'px_cleanup',
62              
63             calc => 'px_calc',
64             push => 'px_push',
65            
66             worker_timeout => 'px_worker_timeout',
67             worker_input => 'px_worker_input',
68             worker_stderr => 'px_worker_stderr',
69             worker_sigchld => 'px_worker_sigchld',
70             worker_closed => 'px_worker_closed',
71             },
72             ],
73             );
74              
75             $self->[SESSID] = $sess->ID;
76              
77             $self
78             }
79              
80              
81             sub px_start {
82             my ($kernel, $self) = @_[KERNEL, OBJECT];
83             $kernel->refcount_increment( $_[SESSION]->ID, 'Waiting for requests' );
84             }
85              
86             sub px_shutdown {
87             my ($kernel, $self) = @_[KERNEL, OBJECT];
88             $kernel->call( $_[SESSION], 'cleanup' );
89             $kernel->refcount_decrement( $_[SESSION]->ID, 'Waiting for requests' );
90             }
91              
92             sub px_cleanup {
93             my ($kernel, $self) = @_[KERNEL, OBJECT];
94             for my $pid (keys %{ $self->[WHEELS] }) {
95             if (my $wheel = delete $self->[WHEELS]->{$pid}) {
96             $wheel->kill('TERM')
97             }
98             }
99             $self->[TAG_BY_WID] = +{};
100             $self->[PENDING] = [];
101             }
102              
103             sub px_calc {
104             # calc => $expr, $hints
105             my ($kernel, $self) = @_[KERNEL, OBJECT];
106             my ($expr, $hints) = @_[ARG0, ARG1];
107             my $sender_id = $_[SENDER]->ID;
108              
109             unless (defined $expr) {
110             warn "'calc' event expected an EXPR and optional hints scalar";
111             $kernel->post( $sender_id => $self->[ERROR_EVENT] =>
112             "EXPR not defined",
113             $hints // +{}
114             );
115             }
116              
117             state $p = [ 'a' .. 'z', 1 .. 9 ];
118             my $tag = join '', map {; $p->[rand @$p] } 1 .. 3;
119             $tag .= $p->[rand @$p] while exists $self->[REQUESTS]->{$tag};
120              
121             my $pending = +{
122             expr => $expr,
123             tag => $tag,
124             hints => ($hints // +{}),
125             sender_id => $sender_id,
126             };
127              
128             $self->[REQUESTS]->{$tag} = $pending;
129             push @{ $self->[PENDING] }, $pending;
130             $kernel->yield('push');
131             }
132              
133             sub px_push {
134             my ($kernel, $self) = @_[KERNEL, OBJECT];
135              
136             return unless @{ $self->[PENDING] };
137              
138             if (keys %{ $self->[WHEELS] } >= $self->[MAX_WORKERS]) {
139             $kernel->delay( push => 0.5 );
140             return
141             }
142              
143             my $wheel = $self->_create_wheel;
144              
145             my $next = shift @{ $self->[PENDING] };
146             my $tag = $next->{tag};
147             $self->[TAG_BY_WID]->{ $wheel->ID } = $tag;
148              
149             $kernel->delay( worker_timeout => $self->[TIMEOUT], $wheel );
150             $wheel->put( [ $next->{tag}, $next->{expr} ] );
151             }
152              
153             sub _create_wheel {
154             my ($self) = @_;
155            
156             my $ppath = $Config{perlpath};
157             if ($^O ne 'VMS') {
158             $ppath .= $Config{_exe} unless $ppath =~ m/$Config{_exe}$/i;
159             }
160            
161             my $forkable;
162             if ($^O eq 'MSWin32') {
163             require Bot::Cobalt::Plugin::Calc::Worker;
164             $forkable = \&Bot::Cobalt::Plugin::Calc::Worker::worker
165             } else {
166             $forkable = [
167             $ppath,
168             (map {; '-I'.$_ } @INC),
169             '-MBot::Cobalt::Plugin::Calc::Worker',
170             '-e',
171             'Bot::Cobalt::Plugin::Calc::Worker->worker'
172             ]
173             }
174              
175             my $wheel = POE::Wheel::Run->new(
176             Program => $forkable,
177             StdioFilter => POE::Filter::Reference->new,
178             StderrEvent => 'worker_stderr',
179             StdoutEvent => 'worker_input',
180             CloseEvent => 'worker_closed',
181             );
182              
183             my $pid = $wheel->PID;
184             $poe_kernel->sig_child($pid, 'worker_sigchld');
185             $self->[WHEELS]->{$pid} = $wheel;
186              
187             $wheel
188             }
189              
190             sub px_worker_timeout {
191             my ($kernel, $self) = @_[KERNEL, OBJECT];
192             my $wheel = $_[ARG0];
193             $wheel->kill('INT');
194             }
195              
196             sub px_worker_input {
197             my ($kernel, $self) = @_[KERNEL, OBJECT];
198            
199             my ($input, $wid) = @_[ARG0, ARG1];
200             my ($tag, $result) = @$input;
201            
202             my $req = delete $self->[REQUESTS]->{$tag};
203             unless ($req) {
204             warn "BUG? worker input but no request found for tag '$tag'";
205             return
206             }
207            
208             $kernel->post( $req->{sender_id} => $self->[RESULT_EVENT] =>
209             $result, $req->{hints}
210             )
211             }
212              
213             sub px_worker_stderr {
214             my ($kernel, $self) = @_[KERNEL, OBJECT];
215             my ($input, $wid) = @_[ARG0, ARG1];
216             my $tag = $self->[TAG_BY_WID]->{$wid};
217             unless (defined $tag) {
218             warn "BUG? px_worker_stderr but no tag for wheel ID '$wid'";
219             }
220             my $req = delete $self->[REQUESTS]->{$tag};
221             if (defined $req) {
222             my $sender_id = $req->{sender_id};
223             my $hints = $req->{hints};
224             $kernel->post( $req->{sender_id} => $self->[ERROR_EVENT] =>
225             "worker '$wid': $input", $hints
226             )
227             } else {
228             warn "stderr from worker but request unavailable: $input"
229             }
230             }
231              
232             sub px_worker_closed {
233             my ($kernel, $self) = @_[KERNEL, OBJECT];
234             my $wid = $_[ARG0];
235             delete $self->[TAG_BY_WID]->{$wid};
236             }
237              
238             sub px_worker_sigchld {
239             my ($kernel, $self) = @_[KERNEL, OBJECT];
240             my $pid = $_[ARG1];
241             my $wheel = delete $self->[WHEELS]->{$pid} || return;
242             delete $self->[TAG_BY_WID]->{ $wheel->ID };
243             $kernel->yield('push')
244             }
245              
246              
247             1;