File Coverage

blib/lib/Patro/Server.pm
Criterion Covered Total %
statement 494 695 71.0
branch 292 422 69.1
condition 49 106 46.2
subroutine 44 50 88.0
pod 0 25 0.0
total 879 1298 67.7


line stmt bran cond sub pod time code
1             package Patro::Server;
2 66     66   1341 use strict;
  66         150  
  66         1745  
3 66     66   303 use warnings;
  66         111  
  66         1594  
4 66     66   276 use Carp;
  66         102  
  66         3350  
5 66     66   5486 eval "use Sys::HostAddr";
  0         0  
  0         0  
6 66     66   339 use Socket ();
  66         118  
  66         1110  
7 66     66   284 use Scalar::Util 'reftype';
  66         156  
  66         3913  
8 66     66   19085 use POSIX ':sys_wait_h';
  66         325044  
  66         337  
9             require overload;
10              
11             our $threads_avail;
12       2607     *sxdiag = sub {};
13             if ($ENV{PATRO_SERVER_DEBUG}) {
14             *sxdiag = *::xdiag;
15             our $DEBUG = 1;
16             }
17             our $VERSION = '0.14';
18 66     66   114019 our @SERVERS :shared;
  66         63023  
  66         348  
19             our %OPTS = ( # XXX - needs documentation
20             keep_alive => 30,
21             idle_timeout => 30,
22             fincheck_freq => 5,
23             );
24              
25             sub new {
26 85     65 0 22866 my $pkg = shift;
27 65         127 my $opts = shift;
28              
29 65   33     364 my $host = $ENV{HOSTNAME} // qx(hostname) // "localhost";
      50        
30 65 50       208 if ($INC{'Sys/HostAddr.pm'}) {
31 0         0 my $host2 = Sys::HostAddr->new->main_ip;
32 0 50       0 $host = $host2 if $host2;
33             }
34 65         180 chomp($host);
35              
36 65 50       19092 socket(my $socket, Socket::PF_INET(), Socket::SOCK_STREAM(),
37             getprotobyname("tcp")) || croak __PACKAGE__, " socket: $!";
38 65 50       922 setsockopt($socket, Socket::SOL_SOCKET(), Socket::SO_REUSEADDR(),
39             pack("l",1)) || croak __PACKAGE__, " setsockopt: $!";
40 65         7459 my $sockaddr = Socket::pack_sockaddr_in(0, Socket::inet_aton($host));
41 65 50       1097 bind($socket, $sockaddr) || croak __PACKAGE__, " bind: $!";
42 65 50       541 listen($socket, Socket::SOMAXCONN()) || croak __PACKAGE__, " listen: $!";
43 65         379 $sockaddr = getsockname($socket);
44 65         358 my ($port,$addr) = Socket::unpack_sockaddr_in($sockaddr);
45              
46             my $meta = {
47             sockaddr => $sockaddr,
48             socket => $socket,
49             host => $host,
50             host2 => Socket::inet_aton($addr),
51             port => $port,
52              
53             creator_pid => $$,
54             creator_tid => $threads_avail && threads->tid,
55             style => $threads_avail ? 'threaded' : 'forked',
56              
57             keep_alive => $OPTS{keep_alive},
58             idle_timeout => $OPTS{idle_timeout},
59 65 50 33     1025175 version => $Patro::Server::VERSION,
60             };
61              
62 65         391 $Patro::SERVER_VERSION = $Patro::Server::VERSION;
63              
64 65         175 my $obj = {};
65 65         151 my @store;
66              
67 65 50       298 if ($threads_avail) {
68 0         0 for (@_) {
69 0         0 local $threads::shared::clone_warn = undef;
70 0         0 eval { $_ = threads::shared::shared_clone($_) };
  0         0  
71 0 0       0 if ($@ =~ /CODE|GLOB/) {
72 0         0 require Patro::LeumJelly;
73 0         0 warn $@;
74 0         0 $threads::shared::clone_warn = 0;
75 0         0 $_ = threads::shared::shared_clone($_);
76             }
77             }
78             }
79 65         311 foreach my $o (@_) {
80 104         232 my ($num,$str);
81             {
82 66     66   27416 no overloading;
  66         138  
  66         2511  
  104         158  
83 66     66   359 no warnings 'portable';
  66         123  
  66         85547  
84 104         276 $str = "$o";
85 104         806 ($num) = $str =~ /x(\w+)/;
86 104         395 $num = hex($num);
87             }
88 104         321 $obj->{$num} = $o;
89 104         526 my $reftype = Scalar::Util::reftype($o);
90 104         256 my $ref = CORE::ref($o);
91 104 50       494 if ($ref eq 'threadsx::shared::code') {
    50          
92 0         0 $ref = $reftype = 'CODE*';
93             } elsif ($ref eq 'threadsx::shared::glob') {
94 0         0 $ref = $reftype = 'GLOB';
95             }
96 104         505 my $store = {
97             ref => $ref,
98             reftype => $reftype,
99             id => $num
100             };
101 104 100       475 if (overload::Overloaded($o)) {
102 12 50 33     708 if ($ref ne 'CODE' && $ref ne 'CODE*' && $ref ne 'GLOB') {
      33        
103 12         36 $store->{overload} = _overloads($o);
104             }
105             }
106 104         4504 push @store, $store;
107             }
108 65         528 my $self = bless {
109             meta => $meta,
110             store => \@store,
111             obj => $obj
112             }, __PACKAGE__;
113 65         353 $self->{config} = $self->config;
114 65         368 $self->start_server;
115 20         198 eval { push @SERVERS, $self };
  20         244  
116 20 50       213 warn $@ if $@;
117 20 50       193 if (@SERVERS == 1) {
118 20         4275 eval q~END {
119             if ($Patro::Server::threads_avail) {
120             $_->detach for threads->list(threads::running);
121             }
122             }~;
123             }
124 20         825 return $self;
125             }
126              
127              
128             sub start_server {
129 65     65 0 134 my $self = shift;
130 65         178 my $meta = $self->{meta};
131 65 50       321 if ($meta->{style} eq 'threaded') {
132 0         0 my $server_thread;
133             $server_thread = threads->create(
134             sub {
135 0     0   0 $SIG{KILL} = sub { exit };
  0         0  
136 0         0 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  0         0  
137 0         0 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
138 0 0       0 if ($self->{meta}{pid_file}) {
139 0         0 open my $fh, '>>', $self->{meta}{pid_file};
140 0         0 flock $fh, 2;
141 0         0 seek $fh, 0, 2;
142 0         0 print {$fh} "$$-", threads->tid, "\n";
  0         0  
143 0         0 close $fh;
144             }
145 0         0 $self->accept_clients;
146 0         0 return;
147 0         0 } );
148 0         0 $self->{meta}{server_thread} = $server_thread;
149 0         0 $self->{meta}{server_pid} = $$;
150 0         0 $self->{meta}{server_tid} = $server_thread->tid;
151             #$server_thread->detach;
152             } else {
153 65         61831 my $pid = CORE::fork();
154 65 50       3476 if (!defined($pid)) {
155 0         0 croak __PACKAGE__, " fork: $!";
156             }
157 65 100       1139 if ($pid == 0) {
158 45 50       903 if ($self->{meta}{pid_file}) {
159 0         0 open my $fh, '>>', $self->{meta}{pid_file};
160 0         0 flock $fh, 2;
161 0         0 seek $fh, 0, 2;
162 0         0 print {$fh} "$$\n";
  0         0  
163 0         0 close $fh;
164             }
165 45         4098 $self->accept_clients;
166 20         4636 exit;
167             }
168 20         771 $self->{meta}{server_pid} = $pid;
169             }
170             }
171              
172             # return list of operators that are overloaded on the given object
173             my @oplist;
174             sub _overloads {
175 16     16   26 my $obj = shift;
176 16 50       37 return unless overload::Overloaded($obj);
177 16 100       609 if (!@oplist) {
178 6         135 @oplist = split ' ',join(' ',values %overload::ops);
179             }
180              
181 16         38 my %ops = map { $_ => 1 } grep { overload::Method($obj,$_) } @oplist;
  607         1808  
  1200         34617  
182              
183             # we also need to account for the operations that are *implicitly*
184             # overloaded.
185              
186             # Many ops can be generated out of 0+, "", or bool
187 16 50 66     111 if ($ops{"0+"} || $ops{'""'} || $ops{bool}) {
      33        
188 16         159 $ops{$_}++ for qw(0+ "" bool int ! qr . x .= x= <> -X);
189             }
190              
191             # assignment ops can be generated from binary ops
192 16         45 foreach my $binop (qw(. x + - * / ** % & | ^ << >> &. |. ^.)) {
193 256 100       472 $ops{$binop . "="}++ if $ops{$binop};
194             }
195              
196             # all comparison ops can be generated from <=> and cmp
197 16 100       71 @ops{qw{< <= > >= == !=}} = (1) x 6 if $ops{"<=>"};
198 16 100       93 @ops{qw(le lt ge gt eq ne)} = (1) x 6 if $ops{cmp};
199              
200 16 100       37 $ops{neg}++ if $ops{"-"};
201 16 100       39 $ops{"--"}++ if $ops{"-="};
202 16 50 66     69 $ops{abs}++ if $ops{"<"} && $ops{neg};
203 16 100       34 $ops{"++"}++ if $ops{"+="};
204              
205             # all ops are overloaded if there is a 'nomethod' specified
206 16 50       28 @ops{@oplist} = (1) x @oplist if $ops{nomethod};
207 16         326 return [keys %ops];
208             }
209              
210             sub config {
211 65     65 0 139 my $self = shift;
212             my $config_data = {
213             host => $self->{meta}{host},
214             port => $self->{meta}{port},
215             store => $self->{store},
216             style => $self->{meta}{style},
217 65         760 version => $Patro::Server::VERSION
218             };
219 65         329 return bless $config_data, 'Patro::Config';
220             }
221              
222             ########################################
223              
224             sub Patro::Config::to_string {
225 10     10   5797 my ($self) = @_;
226 10         489 return Patro::LeumJelly::serialize({%$self});
227             }
228              
229             sub Patro::Config::to_file {
230 2     2   769 my ($self,$file) = @_;
231 2 50       12 if (!$file) {
232             # TODO: select a temp filename
233             }
234 2         5 my $fh;
235 2 50       173 if (!open($fh, '>', $file)) {
236 0         0 croak "Patro::Config::to_file: could not write cfga file '$file': $!";
237             }
238 2         6 print {$fh} $self->to_string;
  2         56  
239 2         208 close $fh;
240 2         10 return $file;
241             }
242              
243             sub Patro::Config::from_string {
244 12     12   37 my ($self, $string) = @_;
245 12         57 my $cfg = Patro::LeumJelly::deserialize($string);
246 12         504 return bless $cfg, 'Patro::Config';
247             }
248              
249             sub Patro::Config::from_file {
250 3     3   6 my ($self, $file) = @_;
251 3 50 33     13 if (!defined($file) && !CORE::ref($self) && $self ne 'Patro::Config') {
      33        
252 0         0 $file = $self;
253             }
254 3         4 my $fh;
255 3 50       58 if (CORE::ref($file) eq 'GLOB') {
    50          
256 0         0 $fh = $file;
257             } elsif (!open $fh, '<' ,$file) {
258 0         0 croak "Patro::Config::fron_file: could not read cfg file '$file': $!";
259             }
260 3         43 my $data = <$fh>;
261 3         22 close $fh;
262 3         12 return Patro::Config->from_string($data);
263             }
264              
265             ########################################
266              
267             sub accept_clients {
268             # accept connection from client
269             # spin off connection to separate thread or process
270             # perform request_response_loop on the client connection
271 45     45 0 348 my $self = shift;
272 45         303 my $meta = $self->{meta};
273              
274 45         575 $meta->{last_connection} = time;
275 45         382 $meta->{finished} = 0;
276              
277 45         426 while (!$meta->{finished}) {
278 90     26   5171 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  26         395  
279 90     0   1112 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
280 90   50     1508 alarm ($OPTS{fincheck_freq} || 5);
281 90         325 my $client;
282 90         343 my $server = $meta->{socket};
283 90         34632647 my $paddr = accept($client,$server);
284 90 100       1691 if (!$paddr) {
285 25 50   66   657 next if $!{EINTR};
  66         18799  
  66         70654  
  66         537  
286 0 0 0     0 next if $!{ECHILD} || $!==10; # !?! why $!{ECHILD} not suff on Lin?
287 0         0 ::xdiag("accept failure, %errno is",\%!);
288 0         0 croak __PACKAGE__, ": accept ", 0+$!," $!";
289             }
290 65         359 $meta->{last_connection} = time;
291              
292 65         897 $self->start_subserver($client);
293 40         1350 $self->watch_for_finishers('MAIN');
294             }
295             }
296              
297             sub start_subserver {
298 65     65 0 409 my ($self,$client) = @_;
299 65 50       669 if ($self->{meta}{style} eq 'forked') {
300 65         47500 my $pid = CORE::fork();
301 65 50       2306 if (!defined($pid)) {
302 0         0 croak __PACKAGE__,": fork after accept $!";
303             }
304 65 100       1276 if ($pid != 0) {
305 40 50       381 if ($self->{meta}{pid_file}) {
306 0         0 open my $fh, '>>', $self->{meta}{pid_file};
307 0         0 flock $fh, 2;
308 0         0 seek $fh, 0, 2;
309 0         0 print {$fh} "$pid\n";
  0         0  
310 0         0 close $fh;
311             }
312 40         903 $self->{meta}{pids}{$pid}++;
313 40         885 return;
314             }
315 25         1301 $self->request_response_loop($client);
316 25         58393 exit;
317             } else {
318             my $subthread = threads->create(
319             sub {
320 0     0   0 $self->request_response_loop($client);
321 0         0 threads->self->detach;
322 0         0 return;
323 0         0 } );
324 0 0       0 if ($self->{meta}{pid_file}) {
325 0         0 open my $fh, '>>', $self->{meta}{pid_file};
326 0         0 flock $fh, 2;
327 0         0 seek $fh, 0, 2;
328 0         0 print {$fh} "$$-", $subthread->tid, "\n";
  0         0  
329 0         0 close $fh;
330             }
331 0         0 $self->{meta}{pids}{"$$-" . $subthread->tid}++;
332 0         0 push @{$self->{meta}{subthreads}}, $subthread;
  0         0  
333              
334             # $subthread->detach ?
335              
336 0         0 return;
337             }
338             }
339              
340             sub watch_for_finishers {
341 66     66 0 604 my ($self,$sig) = @_;
342 66         426 alarm 0;
343              
344             # XXX - how do you know when a thread is finished?
345             # what if it is a detached thread?
346              
347 66   66     2284 while ((my $pid = waitpid(-1,WNOHANG())) > 0 && WIFEXITED($?)) {
348 26         386 delete $self->{meta}{pids}{$pid};
349             }
350 66 50       376 if ($self->{meta}{subthreads}) {
351 0         0 my $n = @{$self->{meta}{subthreads}};
  0         0  
352 0         0 my $n1 = threads->list(threads::all());
353 0         0 my $n2 = threads->list(threads::running());
354 0         0 my @joinable = threads->list(threads::joinable());
355 0 0       0 if (@joinable) {
356 0         0 foreach my $subthread (@joinable) {
357             my ($i) = grep {
358 0         0 $self->{meta}{subthreads}{$_} == $subthread
  0         0  
359             } 0 .. $n-1;
360 0 0       0 if (!defined($i)) {
361 0         0 warn "subthread $subthread not found on this server!";
362 0         0 next;
363             }
364 0         0 $self->{meta}{subthreads}[$i]->join;
365 0         0 $self->{meta}{subthreads}[$i] = undef;
366             }
367             $self->{meta}{subthreads} =
368 0         0 [ grep { defined } @{$self->{meta}{subthreads} } ];
  0         0  
  0         0  
369             }
370             }
371 66 100       597 unless ($self->still_active) {
372 20         108 $self->{meta}{finished}++;
373             }
374 66     0   2041 $SIG{ALRM} = sub { $self->watch_for_finishers(@_) };
  0         0  
375 66     0   1240 $SIG{CHLD} = sub { $self->watch_for_finishers(@_) };
  0         0  
376 66   50     1801 alarm ($OPTS{fincheck_freq} || 5);
377             }
378              
379             sub still_active {
380 66     66 0 238 my $self = shift;
381 66         258 my $meta = $self->{meta};
382 66 50       377 if (time <= $meta->{keep_alive}) {
383 0         0 return 1;
384             }
385 66 100       329 if (time < $meta->{last_connection} + $meta->{idle_timeout}) {
386 41         262 return 1;
387             }
388 25 100       100 if (keys %{$meta->{pids}}) {
  25         231  
389 5         28 return 1;
390             }
391 20         139 return;
392             }
393              
394             sub request_response_loop {
395 25     25 0 264 my ($self, $client) = @_;
396              
397 25         229 local $Patro::Server::disconnect = 0;
398 25         952 my $fh0 = select $client;
399 25         557 $| = 1;
400 25         416 select $fh0;
401              
402 25         90737 while (my $req = readline($client)) {
403 497 50       4734 next unless $req =~ /\S/;
404 497         3123 sxdiag("server: got request '$req'");
405 497         1908 my $resp = $self->process_request($req);
406 497         1413 sxdiag("server: response to request is ",$resp);
407 497         1289 $resp = $self->serialize_response($resp);
408 497         1721 sxdiag("server: serialized response to request is ",$resp);
409 497         674 print {$client} $resp,"\n";
  497         21321  
410 497 100       21863381 last if $Patro::Server::disconnect;
411             }
412 25         1490 close $client;
413 25         179 return;
414             }
415              
416             our $SIDES; # for the server to activate or suppress some
417             # side-effects from the lower levels of the
418             # request handler
419              
420             sub process_request {
421 497     497 0 1192 my ($self,$request) = @_;
422 497 50       2545 croak "process_request: expected scalar request" if ref($request);
423              
424 497         1558 $request = Patro::LeumJelly::deserialize($request);
425 497         13524 my $topic = $request->{topic};
426 497 50       1225 if (!defined($topic)) {
427 0         0 return $self->error_response("bad topic in request '$_[1]'");
428             }
429            
430 497         808 my $has_args = $request->{has_args};
431 497         770 my $args = $request->{args};
432 497 100       1040 if ($request->{has_args}) {
433 342         539 local $@;
434             $args = [ map {
435 477 100       950 if (CORE::ref($_) eq '.Patroon') {
436 5         9 eval { $self->{obj}{$$_} };
  5         18  
437             } else {
438 472         1252 $_
439 342         473 } } @{$request->{args}} ];
  342         897  
440 342 50       870 if ($@) {
441 0         0 return $self->error_response($@);
442             }
443             }
444 497         838 my $id = $request->{id};
445 497         814 my $cmd = $request->{command};
446 497         833 my $ctx = $request->{context};
447 497 100       1305 my @orig_args = $has_args ? @$args : ();
448 497 100       1167 my @orig_refs = $has_args ? \ (@$args) : ();
449 497         1437 my @orig_dump = map Patro::LeumJelly::serialize([$_]), @$args;
450 497         13725 local $! = 0;
451 497         1724 local $? = 0;
452 497         1049 local $SIDES = {};
453 497         754 my @r;
454 497         613 our $DEBUG;
455 497   50     2697 local $DEBUG = $DEBUG || $request->{_debug} || 0;
456              
457 497 100       1904 if ($topic eq 'META') {
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    50          
458 40         237 @r = $self->process_request_META($id,$cmd,$ctx,$has_args,$args);
459             } elsif ($topic eq 'HASH') {
460 61         211 @r = $self->process_request_HASH($id,$cmd,$ctx,$has_args,$args);
461             } elsif ($topic eq 'ARRAY') {
462 240         667 @r = $self->process_request_ARRAY($id,$cmd,$ctx,$has_args,$args);
463             } elsif ($topic eq 'SCALAR') {
464 18         76 @r = $self->process_request_SCALAR($id,$cmd,$ctx,$has_args,$args);
465             } elsif ($topic eq 'METHOD') {
466 30         167 @r = $self->process_request_METHOD($id,$cmd,$ctx,$has_args,$args);
467             } elsif ($topic eq 'CODE') {
468 3         35 @r = $self->process_request_CODE($id,undef,$ctx,$has_args,$args);
469             } elsif ($topic eq 'HANDLE') {
470 85         325 @r = $self->process_request_HANDLE($id,$cmd,$ctx,$has_args,$args);
471             } elsif ($topic eq 'OVERLOAD') {
472 15         38 my $obj = $self->{obj}{$id};
473 15         62 @r = $self->process_request_OVERLOAD($obj,$cmd,$args,$ctx);
474             } elsif ($topic eq 'REF') {
475 5         19 @r = $self->process_request_REF($id,$cmd,$ctx,$has_args,$args);
476             } else {
477 0         0 @r = ();
478 0         0 $@ = __PACKAGE__ . ": unrecognized topic '$topic' in proxy request";
479             }
480 497 100 100     2385 if (@r && CORE::ref($r[0]) eq '.Patroclus') {
481 40         436 return $r[0];
482             }
483 457         1631 my $sides = bless {}, '.Patroclus';
484              
485 457 100       1540 $sides->{errno} = 0 + $! if $!;
486 457 100       1569 $sides->{errno_extended} = $^E if $^E;
487 457 100       1009 $sides->{child_error} = $? if $?;
488 457 100       851 $sides->{error} = $@ if $@;
489 457         1069 $sides->{"x-requestId"} = ++$Patro::Server::requestId;
490              
491             # how to update elements of @_ that have changes?
492             # three implementations below. Pick one.
493             # 1. "side A" - return all elements of @_. You will have to
494             # filter out "Modification of a read-only element attempted ..."
495             # messages
496             # 2. "side B" - do a deep comparison of original and final
497             # elements of @_, return the ones that mismatch I CHOOSE YOU!
498             # 3. original implementation - do shallow comparison of original
499             # and final elements of @_. Insufficient for code that updates
500             # nested data of the inputs
501 457         697 my (@out,@outref);
502              
503             # "sideB" - do a deep compare for all arguments
504 457   100     1943 for (my $j=0; $j<@$args && !$SIDES->{no_out}; $j++) {
505 407         1321 my $dj = Patro::LeumJelly::serialize([$args->[$j]]);
506 407         11476 for (my $i=0; $i<@orig_refs; $i++) {
507 1008 100       2712 next if $orig_refs[$i] != \$args->[$j];
508 407 100       1498 if ($orig_dump[$i] ne $dj) {
509 14         52 push @out, $i, $args->[$j];
510             }
511             }
512             }
513 457         893 $sides->{sideB} = 1;
514              
515 457 100       972 $sides->{out} = \@out if @out;
516 457 50       888 $sides->{outref} = \@outref if @outref;
517 457 100 100     1905 if ($ctx >= 2) {
    100          
518 15         70 return $self->list_response($sides, @r);
519             } elsif ($ctx == 1 && defined $r[0]) {
520 410         1221 my $y = $self->scalar_response($sides, $r[0]);
521             # if ($topic eq 'REF') { ::xdiag("response:",$y) }
522 410         3500 return $y;
523             } else {
524 32         146 return $self->void_response($sides);
525             }
526             }
527              
528             sub process_request_META {
529 40     40 0 178 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
530 40 100       158 if ($cmd eq 'disconnect') {
531 4         11 $Patro::Server::disconnect = 1;
532 4         50 return bless { disconnect_ok => 1 }, '.Patroclus';
533             }
534 36         162 my $obj = $self->{obj}{$id};
535 36 50       269 if ($cmd eq 'ref') {
    50          
    50          
536 0         0 return CORE::ref($obj);
537             } elsif ($cmd eq 'reftype') {
538 0         0 return Scalar::Util::reftype($obj);
539             } elsif ($cmd eq 'destroy') {
540 36         187 delete $self->{obj}{$id};
541 36         80 my @ids = keys %{$self->{obj}};
  36         214  
542 36 100       168 if (@ids == 0) {
543 7         31 $Patro::Server::disconnect = 1;
544 7         94 return bless { disconnect_ok => 1 }, '.Patroclus';
545             } else {
546 29         300 return bless { disconnect_ok => 0,
547             num_reminaing_objs => 0+@ids }, '.Patroclus';
548             }
549             } else {
550 0         0 $@ = "Patro: unsupported meta command '$cmd'";
551 0         0 return;
552             }
553             }
554              
555             sub process_request_HASH {
556 61     61 0 200 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
557 61         172 my $obj = $self->{obj}{$id};
558 61 50       411 if (reftype($obj) ne 'HASH') {
559 0         0 $@ = "Not a HASH reference";
560 0         0 return;
561             # !!! what if '%{}' op is overloaded?
562             }
563 61 100       198 if ($cmd eq 'STORE') {
    100          
    100          
    50          
    0          
    0          
    0          
    0          
564 6         20 my ($key,$val) = @$args;
565 6         18 my $old_val = $obj->{$key};
566 6         106 $obj->{$key} = threads::shared::shared_clone($val);
567 6         23 return $old_val;
568             } elsif ($cmd eq 'FETCH') {
569 45         216 return $obj->{$args->[0]};
570             } elsif ($cmd eq 'DELETE') {
571 2         20 return delete $obj->{$args->[0]};
572             } elsif ($cmd eq 'EXISTS') {
573 8         35 return exists $obj->{$args->[0]};
574             } elsif ($cmd eq 'CLEAR') {
575 0         0 %$obj = ();
576 0         0 return;
577             } elsif ($cmd eq 'FIRSTKEY') {
578 0         0 keys %$obj;
579 0         0 my ($k,$v) = each %$obj;
580 0         0 return $k;
581             } elsif ($cmd eq 'NEXTKEY') {
582 0         0 my ($k,$v) = each %$obj;
583 0         0 return $k;
584             } elsif ($cmd eq 'SCALAR') {
585 0         0 return scalar %$obj;
586             } else {
587 0         0 $@ = "HASH function '$cmd' not recognized";
588 0         0 return;
589             }
590             }
591              
592             sub process_request_ARRAY {
593 240     240 0 578 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
594 240         468 my $obj = $self->{obj}{$id};
595 240 50       772 if (reftype($obj) ne 'ARRAY') {
596 0         0 $@ = "Not an ARRAY ref";
597 0         0 return;
598             }
599 240 100 33     717 if ($cmd eq 'STORE') {
    100          
    100          
    50          
    100          
    100          
    100          
    100          
    50          
    0          
600 6         17 my ($index,$val) = @$args;
601 6         13 my $old_val = $obj->[$index];
602             # ?!!!? does $val have to be shared?
603 6         13 eval { $obj->[$index] = threads::shared::shared_clone($val) };
  6         48  
604 6         41 return $old_val;
605             } elsif ($cmd eq 'FETCH') {
606 174         244 return eval { $obj->[$args->[0]] };
  174         543  
607             } elsif ($cmd eq 'FETCHSIZE') {
608 32         81 return scalar @$obj;
609             } elsif ($cmd eq 'STORESIZE' || $cmd eq 'EXTEND') {
610 0         0 my $n = $#{$obj} = $args->[0]-1;
  0         0  
611 0         0 return $n+1;
612             } elsif ($cmd eq 'SPLICE') {
613 16         34 my ($off,$len,@list) = @$args;
614 16 100       30 if ($off < 0) {
615 3         5 $off += @$obj;
616 3 50       6 if ($off < 0) {
617 0         0 $@ = "Modification of non-createable array value attempted, "
618             . "subscript $off";
619 0         0 return;
620             }
621             }
622 16 100 66     56 if (!defined($len) || $len eq 'undef') {
623 2         3 $len = @{$obj} - $off;
  2         4  
624             }
625 16 100       31 if ($len < 0) {
626 2         3 $len += @{$obj} - $off;
  2         13  
627 2 50       9 if ($len < 0) {
628 0         0 $len = 0;
629             }
630             }
631 16         21 my @val = splice @{$obj}, $off, $len, @list;
  16         88  
632 16         45 $SIDES->{no_out} = 1; # don't try to update @_
633             # SPLICE is the only ARRAY function that doesn't assume scalar context
634 16 100       31 if ($ctx == 1) {
635 6 50       51 return @val > 0 ? $val[-1] : undef;
636             } else {
637 10         25 return @val;
638             }
639             } elsif ($cmd eq 'PUSH') {
640 4         7 return push @{$obj}, map threads::shared::shared_clone($_), @$args;
  4         55  
641             } elsif ($cmd eq 'UNSHIFT') {
642 2         5 return unshift @{$obj}, map threads::shared::shared_clone($_), @$args;
  2         11  
643             } elsif ($cmd eq 'POP') {
644 3         5 return pop @{$obj};
  3         10  
645             } elsif ($cmd eq 'SHIFT') {
646 3         8 return shift @{$obj};
  3         18  
647             } elsif ($cmd eq 'EXISTS') {
648 0         0 return exists $obj->[$args->[0]];
649             } else {
650 0         0 $@ = "tied ARRAY function '$cmd' not recognized";
651 0         0 return;
652             }
653             }
654              
655             sub process_request_SCALAR {
656 18     18 0 76 my ($self,$id,$cmd,$ctx,$has_args,$args) = @_;
657 18         55 my $obj = $self->{obj}{$id};
658 18 50       108 if (reftype($obj) ne 'SCALAR') {
659 0         0 $@ = "Not a SCALAR reference";
660 0         0 return;
661             }
662 18 100       61 if ($cmd eq 'STORE') {
    50          
663 6         13 my $val = ${$obj};
  6         19  
664 6         88 ${$obj} = threads::shared::shared_clone($args->[0]);
  6         24  
665 6         29 return $val;
666             } elsif ($cmd eq 'FETCH') {
667 12         24 return ${$obj};
  12         58  
668             } else {
669 0         0 $@ = "tied SCALAR function '$cmd' not recognized";
670 0         0 return;
671             }
672             }
673              
674             sub process_request_METHOD {
675 30     30 0 108 my ($self,$id,$command,$context,$has_args,$args) = @_;
676 30         91 my $obj = $self->{obj}{$id};
677 30 50       95 if (!$obj) {
678 0         0 $@ = "Bad object id '$id' in proxy method call";
679 0         0 return;
680             }
681 30         93 my @r;
682 30 100       190 if ($command =~ /::/) {
    100          
683 66     66   157783 no strict 'refs';
  66         178  
  66         122396  
684 1 50       4 if ($context < 2) {
685 1 50       2 @r = scalar eval { $has_args ? &$command($obj,@$args)
  1         21  
686             : &$command($obj) };
687             } else {
688 0 0       0 @r = eval { $has_args ? &$command($obj,@$args)
  0         0  
689             : &$command($obj) };
690             }
691             } elsif ($context < 2) {
692 28 100       50 @r = scalar eval { $has_args ? $obj->$command(@$args)
  28         368  
693             : $obj->$command };
694             } else {
695 1 50       3 @r = eval { $has_args ? $obj->$command(@$args)
  1         9  
696             : $obj->$command };
697             }
698 30         372 return @r;
699             }
700              
701             sub process_request_HANDLE {
702 85     85 0 266 my ($self,$id,$command,$context,$has_args,$args) = @_;
703 85         251 my $obj = $self->{obj}{$id};
704 85 50       226 my $fh = CORE::ref($obj) eq 'threadsx::shared::glob' ? $obj->glob : $obj;
705 85 100 66     997 if ($command eq 'PRINT') {
    100 100        
    50          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    50          
    100          
    100          
    50          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    100          
    50          
706 7         15 my $z = print {$fh} @$args;
  7         216  
707 7         38 return $z;
708             } elsif ($command eq 'PRINTF') {
709 1 50       6 if ($has_args) {
710 1         3 my $template = shift @$args;
711 1         4 my $z = printf {$fh} $template, @$args;
  1         20  
712 1         5 return $z;
713             } else {
714             # I don't think we can get here through the proxy
715 0         0 my $z = printf {$fh} "";
  0         0  
716 0         0 return $z;
717             }
718             } elsif ($command eq 'WRITE') {
719 0 0       0 if (@$args < 2) {
720 0         0 return $self->error_response("Not enough arguments for syswrite");
721             }
722 0   0     0 return syswrite($fh, $args->[0],
      0        
723             $args->[1] // undef, $args->[2] // undef);
724             } elsif ($command eq 'READLINE') {
725 13         26 my @val;
726 13 100       46 if ($context > 1) {
727 2         28 my @val = readline($fh);
728 2         12 return @val;
729             } else {
730 11         142 my $val = readline($fh);
731 11         59 return $val;
732             }
733             } elsif ($command eq 'GETC') {
734 5         28 my $ch = getc($fh);
735 5         23 return $ch;
736             } elsif ($command eq 'READ' || $command eq 'READ?' ||
737             $command eq 'SYSREAD') {
738 3         10 local $Patro::read_sysread_flag; # don't clobber
739 3 50       10 if (@$args < 2) {
740             # I don't think we can get here through the proxy
741 0         0 $@ = "Not enough arguments for " . lc($command);
742 0         0 return;
743             }
744 3         6 my (undef, $len, $off) = @$args;
745 3         4 my $z;
746 3 100 33     13 if ($command eq 'SYSREAD' ||
      66        
747             ($command eq 'READ?' && fileno($fh) >= 0)) {
748 1   50     7 $z = sysread $fh, $args->[0], $len, $off || 0;
749             } else {
750             # sysread doesn't work, for example, on file handles opened
751             # from a reference to a scalar
752 2   50     11 $z = read $fh, $args->[0], $len, $off || 0;
753             }
754 3         10 return $z;
755             } elsif ($command eq 'EOF') {
756 2         24 return eof($fh);
757             } elsif ($command eq 'FILENO') {
758 1         4 my $z = fileno($fh);
759 1         3 return $z;
760             } elsif ($command eq 'SEEK') {
761 2 50       26 if (@$args < 2) {
    50          
762 0         0 $@ = "Not enough arguments for seek";
763 0         0 return;
764             } elsif (@$args > 2) {
765 0         0 $@ = "Too many arguments for seek";
766 0         0 return;
767             } else {
768 2         23 my $z = seek $fh, $args->[0], $args->[1];
769 2         12 return $z;
770             }
771             } elsif ($command eq 'TELL') {
772 8         32 my $z = tell($fh);
773 8         29 return $z;
774             } elsif ($command eq 'BINMODE') {
775 3         5 my $z;
776 3 100       10 if (@$args) {
777 2         156 $z = binmode $fh, $args->[0];
778             } else {
779 1         9 $z = binmode $fh;
780             }
781 3         234 return $z;
782             } elsif ($command eq 'CLOSE') {
783 8 50       27 if ($Patro::SECURE) {
784 0         0 $@ = "Patro: insecure CLOSE operation on proxy filehandle";
785 0         0 return;
786             }
787 8         136 my $z = close $fh;
788 8         45 return $z;
789             } elsif ($command eq 'OPEN') {
790 5 50       16 if ($Patro::SECURE) {
791 0         0 $@ = "Patro: insecure OPEN operation on proxy filehandle";
792 0         0 return;
793             }
794 5         14 my $z;
795 5         14 my $mode = shift @$args;
796 5 100       14 if (@$args == 0) {
797 1         34 $z = open $fh, $mode;
798             } else {
799 4         15 my $expr = shift @$args;
800 4 100       10 if (@$args == 0) {
801 3         164 $z = open $fh, $mode, $expr;
802             } else {
803 1         2057 $z = open $fh, $mode, $expr, @$args;
804             }
805             }
806              
807             # it is hard to set autoflush from the proxy.
808             # Since it is usually what you want, let's do it here.
809 5 50       29 if ($z) {
810 5         35 my $fh_sel = select $fh;
811 5         22 $| = 1;
812 5         23 select $fh_sel;
813             }
814 5         34 return $z;
815             }
816             # commands that are not in the tied filehandle
817             elsif ($command eq 'TRUNCATE') {
818 1         18 my $z = truncate $fh, $args->[0];
819 1         7 return $z;
820             } elsif ($command eq 'FCNTL') {
821 0         0 my $z = fcntl $fh, $args->[0], $args->[1];
822 0         0 return $z;
823             } elsif ($command eq 'FLOCK') {
824 4         20 my $z = flock $fh, $args->[0];
825 4         11 return $z;
826             } elsif ($command eq 'STAT') {
827 2 100       9 if ($context < 2) {
828 1         15 return scalar stat $fh;
829             } else {
830 1         7 return stat $fh;
831             }
832             } elsif ($command eq 'LSTAT') {
833 0 0       0 if ($context < 2) {
834 0         0 return scalar lstat $fh;
835             } else {
836 0         0 return lstat $fh;
837             }
838             } elsif ($command eq '-X') {
839 6         20 my $key = $args->[0];
840 6         589 return eval "-$key \$fh";
841             } elsif ($command eq 'SYSOPEN') {
842 3 50       17 if ($Patro::SECURE) {
843 0         0 $@ = "Patro: insecure SYSOPEN operation on proxy filehandle";
844 0         0 return;
845             }
846 3 50       27 my $z = @$args <= 2 ? sysopen $fh, $args->[0], $args->[1]
847             : sysopen $fh, $args->[0], $args->[1], $args->[2];
848 3         20 return $z;
849              
850             # commands that operate on DIRHANDLEs
851             } elsif ($command eq 'OPENDIR') {
852 1 50       5 if ($Patro::SECURE) {
853 0         0 $@ = "Patro: insecure OPENDIR operation on proxy dirhandle";
854 0         0 return;
855             }
856 1         18 return opendir $fh, $args->[0];
857             } elsif ($command eq 'REWINDDIR') {
858 1         5 return rewinddir $fh;
859             } elsif ($command eq 'TELLDIR') {
860 3         20 return telldir $fh;
861             } elsif ($command eq 'READDIR') {
862 3 100       7 if ($context < 2) {
863 2         19 return scalar readdir $fh;
864             } else {
865 1         4 my @r = readdir $fh;
866 1         14 return @r;
867             }
868             } elsif ($command eq 'SEEKDIR') {
869 1         5 return seekdir $fh, $args->[0];
870             } elsif ($command eq 'CLOSEDIR') {
871 1         7 return closedir $fh;
872             } elsif ($command eq 'CHDIR') {
873 1         5 return chdir $fh;
874            
875             } else {
876 0         0 $@ = "tied HANDLE function '$command' not found";
877 0         0 return;
878             }
879             }
880              
881             sub process_request_CODE {
882 3     3 0 29 my ($self,$id,$command_NOTUSED,$context,$has_args,$args) = @_;
883 3         20 my $sub = $self->{obj}{$id};
884 3 50       22 if (CORE::ref($sub) eq 'threadsx::shared::code') {
885 0         0 $sub = $sub->code;
886             }
887 3 50       17 if ($context < 2) {
888 3 100       28 return scalar eval { $has_args ? $sub->(@$args) : $sub->() };
  3         54  
889             } else {
890 0 0       0 return eval { $has_args ? $sub->(@$args) : $sub->() };
  0         0  
891             }
892             }
893              
894             sub process_request_OVERLOAD {
895 15     15 0 39 my ($self,$x,$op,$args,$context) = @_;
896 15 100       62 if ($op eq '@{}') {
    100          
    50          
    50          
897 2         16 return \@$x;
898             } elsif ($op eq '%{}') {
899 1         6 return \%$x;
900             } elsif ($op eq '&{}') {
901 0         0 return \&$x;
902             } elsif ($op eq '${}') {
903 0         0 return \$$x;
904             } # elsif ($op eq '*{}') { return \*$x; }
905 12         29 my ($y,$swap) = @$args;
906 12 50       21 if ($swap) {
907 0         0 ($x,$y) = ($y,$x);
908             }
909 12         32 local $@ = '';
910 12         18 my $z;
911 12 50       41 if ($op =~ /[&|~^][.]=?/) {
912 0         0 $op =~ s/\.//;
913             }
914 12 50 33     273 if ($op eq '-X') {
    50 33        
    50 33        
    50 33        
    50 33        
    50 33        
    100 33        
    50 33        
    100 33        
    50          
915 0         0 $z = eval "-$y \$x";
916             } elsif ($op eq 'neg') {
917 0         0 $z = eval { -$x };
  0         0  
918             } elsif ($op eq '!' || $op eq '~' || $op eq '++' || $op eq '--') {
919 0         0 $z = eval "$op\$x";
920             } elsif ($op eq 'qr') {
921 0         0 $z = eval { qr/$x/ };
  0         0  
922             } elsif ($op eq 'atan2') {
923 0         0 $z = eval { atan2($x,$y) };
  0         0  
924             } elsif ($op eq 'cos' || $op eq 'sin' || $op eq 'exp' || $op eq 'abs' ||
925             $op eq 'int' || $op eq 'sqrt' || $op eq 'log') {
926 0         0 $z = eval "$op(\$x)";
927             } elsif ($op eq 'bool') {
928 3 50       10 $z = eval { $x ? 1 : 0 }; # this isn't right
  3         188  
929             } elsif ($op eq '0+') {
930 0         0 $z = eval "0 + \$x"; # this isn't right, either
931             } elsif ($op eq '""') {
932 2         4 $z = eval { "$x" };
  2         28  
933             } elsif ($op eq '<>') {
934             # always scalar context readline
935 0         0 $z = eval { readline($x) };
  0         0  
936             } else { # binary operator
937 7         462 $z = eval "\$x $op \$y";
938             }
939 12 50       573 if ($@) {
940 0         0 return;
941             }
942 12 50       37 if ($threads_avail) {
943 0         0 $z = threads::shared::shared_clone($z);
944             }
945 12         39 return $z;
946             }
947              
948             sub process_request_REF {
949 5     5 0 15 my ($self,$id,$command,$context,$has_args,$args) = @_;
950 5         10 my $obj = $self->{obj}{$id};
951 5 50       33 if (reftype($obj) ne 'REF') {
952 0         0 $@ = "Not a REF";
953 0         0 return;
954             }
955 5 50       15 if ($command eq 'deref') {
956 5         14 return $$obj;
957             }
958 0         0 $@ = "$command is not an appropriate operation for REF";
959 0         0 return;
960             }
961              
962             ########################################
963              
964             sub void_response {
965 32     32 0 73 my $addl = {};
966 32 50 33     241 if (@_ > 0 && CORE::ref($_[-1]) eq '.Patroclus') {
967 32         95 $addl = pop @_;
968             }
969 32         434 return +{ context => 0, response => undef, %$addl };
970             }
971              
972             sub scalar_response {
973 410     410 0 855 my ($self,$sides,$val) = @_;
974             return +{
975 410         2119 context => 1,
976             response => $val,
977             %$sides
978             };
979             }
980              
981             sub list_response {
982 15     15 0 42 my ($self,$sides,@val) = @_;
983             return +{
984 15         201 context => 2,
985             response => \@val,
986             %$sides
987             };
988             }
989              
990             sub error_response {
991 0     0 0 0 my ($self,@msg) = @_;
992 0         0 return { error => join('', @msg) };
993             }
994              
995             ########################################
996              
997             sub serialize_response {
998 497     497 0 969 my ($self, $resp) = @_;
999 497 100       1074 if ($resp->{context}) {
1000 425 100       869 if ($resp->{context} == 1) {
    50          
1001 410         1009 $resp->{response} = patrol($self,$resp,$resp->{response});
1002             } elsif ($resp->{context} == 2) {
1003             $resp->{response} = [
1004 15         22 map patrol($self,$resp,$_), @{$resp->{response}} ];
  15         60  
1005             }
1006             }
1007              
1008 497 100       1184 if ($resp->{out}) {
1009 12         35 $resp->{out} = [ map patrol($self,$resp,$_), @{$resp->{out}} ];
  12         50  
1010             }
1011              
1012 497         1064 sxdiag("Server: final response before serialization: ",$resp);
1013 497         1027 $resp = Patro::LeumJelly::serialize($resp);
1014 497         20009 return $resp;
1015             }
1016              
1017             # we should not send any serialized references back to the client.
1018             # replace any references in the response with an
1019             # object id.
1020             sub patrol {
1021 506     506 0 906 my ($self,$resp,$obj) = @_;
1022 506 50       1694 sxdiag("patrol: called on: ",defined($obj) ? "$obj" : "");
1023 506 100       1306 return $obj unless ref($obj);
1024              
1025 62 50       158 if ($threads_avail) {
1026 0 0       0 if (CORE::ref($obj) eq 'CODE') {
    0          
1027 0         0 $obj = threadsx::shared::code->new($obj);
1028 0         0 sxdiag("patrol: coderef converted");
1029             } elsif (CORE::ref($obj) eq 'GLOB') {
1030 0         0 $obj = threadsx::shared::glob->new($obj);
1031 0         0 sxdiag("patrol: glob converted");
1032             }
1033             }
1034              
1035 62         96 my $id = do {
1036 66     66   562 no overloading;
  66         124  
  66         21623  
1037 62         139 0 + $obj;
1038             };
1039              
1040 62 100       221 if (!$self->{obj}{$id}) {
1041 31         121 $self->{obj}{$id} = $obj;
1042 31         73 my $ref = CORE::ref($obj);
1043 31         61 my $reftype;
1044 31 50       115 if ($ref eq 'threadsx::shared::code') {
    50          
1045 0         0 $ref = 'CODE';
1046 0         0 $reftype = 'CODE';
1047             } elsif ($ref eq 'threadsx::shared::glob') {
1048 0         0 $ref = 'GLOB';
1049 0         0 $reftype = 'GLOB';
1050             } else {
1051 31         129 $reftype = Scalar::Util::reftype($obj);
1052             }
1053 31         136 sxdiag("patrol: ref types for $id are $ref,$reftype");
1054 31         274 $resp->{meta}{$id} = {
1055             id => $id, ref => $ref, reftype => $reftype
1056             };
1057 31 100       194 if (overload::Overloaded($obj)) {
1058 4         246 $resp->{meta}{$id}{overload} = _overloads($obj);
1059             }
1060 31         1758 sxdiag("new response meta: ",$resp->{meta}{$id});
1061             } else {
1062 31         111 sxdiag("id $id has been seen before");
1063             }
1064 62         290 return bless \$id,'.Patrobras';
1065             }
1066              
1067             sub TEST_MODE {
1068 65 50   65 0 234 if ($INC{'perl5db.pl'}) {
1069 0         0 ::xdiag("TEST_MODE adjusted for debugging");
1070 0         0 $OPTS{keep_alive} = 3600;
1071 0         0 $OPTS{fincheck_freq} = 3500;
1072 0         0 $OPTS{idle_timeout} = 3600;
1073 0         0 alarm 9999;
1074 0         0 return;
1075             }
1076 65         136 $OPTS{keep_alive} = 2;
1077 65         113 $OPTS{fincheck_freq} = 2;
1078 65         98 $OPTS{idle_timeout} = 1;
1079 65 50       207 if ($threads_avail) {
1080 0         0 $OPTS{fincheck_freq} = "0 but true";
1081             }
1082             }
1083              
1084             1;
1085              
1086             =head1 NAME
1087              
1088             Patro::Server - remote object server for Patro
1089              
1090             =head1 VERSION
1091              
1092             0.14
1093              
1094             =head1 DESCRIPTION
1095              
1096             A server class for making references available to proxy clients
1097             in the L distribution.
1098             The server handles requests for any references that are being served,
1099             manipulates references on the server, and returns the results of
1100             operations to the proxy objects on the clients.
1101              
1102             =head1 LICENSE AND COPYRIGHT
1103              
1104             MIT License
1105              
1106             Copyright (c) 2017, Marty O'Brien
1107              
1108             Permission is hereby granted, free of charge, to any person obtaining a copy
1109             of this software and associated documentation files (the "Software"), to deal
1110             in the Software without restriction, including without limitation the rights
1111             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
1112             copies of the Software, and to permit persons to whom the Software is
1113             furnished to do so, subject to the following conditions:
1114              
1115             The above copyright notice and this permission notice shall be included in all
1116             copies or substantial portions of the Software.
1117              
1118             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1119             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1120             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1121             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1122             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
1123             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
1124             SOFTWARE.
1125              
1126             =cut