File Coverage

blib/lib/Test/Stream/IPC/Files.pm
Criterion Covered Total %
statement 161 161 100.0
branch 73 88 82.9
condition 6 6 100.0
subroutine 22 22 100.0
pod 6 10 60.0
total 268 287 93.3


line stmt bran cond sub pod time code
1             package Test::Stream::IPC::Files;
2 99     99   644 use strict;
  99         99  
  99         2198  
3 99     99   293 use warnings;
  99         90  
  99         1919  
4              
5 99     99   281 use base 'Test::Stream::IPC';
  99         101  
  99         6721  
6              
7             use Test::Stream::HashBase(
8 99         571 accessors => [qw/tempdir event_id tid pid globals/],
9 99     99   366 );
  99         123  
10              
11 99     99   438 use Scalar::Util qw/blessed/;
  99         117  
  99         3857  
12 99     99   66390 use File::Temp();
  99         1540465  
  99         2168  
13 99     99   52402 use Storable();
  99         224157  
  99         2098  
14 99     99   521 use File::Spec();
  99         143  
  99         1741  
15              
16 99     99   748 use Test::Stream::Util qw/try get_tid pkg_to_file/;
  99         134  
  99         820  
17              
18 161     161 1 502 sub is_viable { 1 }
19              
20             sub init {
21 305     305 0 349 my $self = shift;
22              
23 305         937 my $tmpdir = File::Temp::tempdir(CLEANUP => 0);
24              
25 305 50       97923 $self->abort_trace("Could not get a temp dir") unless $tmpdir;
26              
27 305         1692 $self->{+TEMPDIR} = File::Spec->canonpath($tmpdir);
28              
29             print STDERR "\nIPC Temp Dir: $tmpdir\n\n"
30 305 100       826 if $ENV{TS_KEEP_TEMPDIR};
31              
32 305         473 $self->{+EVENT_ID} = 1;
33              
34 305         458 $self->{+TID} = get_tid();
35 305         658 $self->{+PID} = $$;
36              
37 305         521 $self->{+GLOBALS} = {};
38              
39 305         658 return $self;
40             }
41              
42             sub hub_file {
43 1019     1019 0 937 my $self = shift;
44 1019         967 my ($hid) = @_;
45 1019         1217 my $tdir = $self->{+TEMPDIR};
46 1019         6403 return File::Spec->canonpath("$tdir/HUB-$hid");
47             }
48              
49             sub event_file {
50 141     141 0 188 my $self = shift;
51 141         213 my ($hid, $e) = @_;
52              
53 141         241 my $tempdir = $self->{+TEMPDIR};
54 141 100       791 my $type = blessed($e) or $self->abort("'$e' is not a blessed object!");
55              
56 140 100       809 $self->abort("'$e' is not an event object!")
57             unless $type->isa('Test::Stream::Event');
58              
59 139         580 my @type = split '::', $type;
60 139         662 my $name = join('-', $hid, $$, get_tid(), $self->{+EVENT_ID}++, @type);
61              
62 139         714 return File::Spec->canonpath("$tempdir/$name");
63             }
64              
65             sub add_hub {
66 490     490 1 1093 my $self = shift;
67 490         586 my ($hid) = @_;
68              
69 490         1045 my $hfile = $self->hub_file($hid);
70              
71 490 100       10513 $self->abort_trace("File for hub '$hid' already exists")
72             if -e $hfile;
73              
74 489 50       27442 open(my $fh, '>', $hfile) or $self->abort_trace("Could not create hub file '$hid': $!");
75 489         4450 print $fh "$$\n" . get_tid() . "\n";
76 489         16119 close($fh);
77             }
78              
79             sub drop_hub {
80 386     386 1 433 my $self = shift;
81 386         452 my ($hid) = @_;
82              
83 386         491 my $tdir = $self->{+TEMPDIR};
84 386         677 my $hfile = $self->hub_file($hid);
85              
86 386 100       5928 $self->abort_trace("File for hub '$hid' does not exist")
87             unless -e $hfile;
88              
89 385 50       9091 open(my $fh, '<', $hfile) or $self->abort_trace("Could not open hub file '$hid': $!");
90 385         5351 my ($pid, $tid) = <$fh>;
91 385         2046 close($fh);
92              
93 385 50       1349 $self->abort_trace("A hub file can only be closed by the process that started it\nExpected $pid, got $$")
94             unless $pid == $$;
95              
96 385 50       779 $self->abort_trace("A hub file can only be closed by the thread that started it\nExpected $tid, got " . get_tid())
97             unless get_tid() == $tid;
98              
99 385 100       888 if ($ENV{TS_KEEP_TEMPDIR}) {
100 1 50       35 rename($hfile, File::Spec->canonpath("$hfile.complete")) or $self->abort_trace("Could not rename file '$hfile' -> '$hfile.complete'");
101             }
102             else {
103 384 50       21378 unlink($hfile) or $self->abort_trace("Could not remove file for hub '$hid'");
104             }
105              
106 385 50       6505 opendir(my $dh, $tdir) or $self->abort_trace("Could not open temp dir!");
107 385         4894 for my $file (readdir($dh)) {
108 1128 100       1852 next if $file =~ m{\.complete$};
109 1126 100       6823 next unless $file =~ m{^$hid};
110 1         7 $self->abort_trace("Not all files from hub '$hid' have been collected!");
111             }
112 384         8186 closedir($dh);
113             }
114              
115             sub send {
116 143     143 1 339 my $self = shift;
117 143         1075 my ($hid, $e) = @_;
118              
119 143         274 my $tempdir = $self->{+TEMPDIR};
120 143         286 my $global = $hid eq 'GLOBAL';
121 143         444 my $hfile = $self->hub_file($hid);
122              
123 143 100 100     957 $self->abort("hub '$hid' is not available! Failed to send event!\n")
124             unless $global || -f $hfile;
125              
126 141         492 my $file = $self->event_file($hid, $e);
127 139         450 my $ready = File::Spec->canonpath("$file.ready");
128              
129 139 100       377 if ($global) {
130 129         170 my $name = $ready;
131 129         765 $name =~ s{^.*(GLOBAL)}{GLOBAL};
132 129         568 $self->globals->{$name}++;
133             }
134              
135             my ($ok, $err) = try {
136 139     139   685 Storable::store($e, $file);
137 138 50       36685 rename($file, $ready) or $self->abort("Could not rename file '$file' -> '$ready'");
138 139         1471 };
139 139 100       1105 if (!$ok) {
140 1         1 my $src_file = __FILE__;
141 1         18 $err =~ s{ at \Q$src_file\E.*$}{};
142 1         2 chomp($err);
143 1         2 my $tid = get_tid();
144 1         6 my $trace = $e->debug->trace;
145 1         4 my $type = blessed($e);
146              
147 1         7 $self->abort(<<" EOT");
148              
149             *******************************************************************************
150             There was an error writing an event:
151             Destination: $hid
152             Origin PID: $$
153             Origin TID: $tid
154             Event Type: $type
155             Event Trace: $trace
156             File Name: $file
157             Ready Name: $ready
158             Error: $err
159             *******************************************************************************
160              
161             EOT
162             }
163              
164 138         336 return 1;
165             }
166              
167             sub cull {
168 800     800 1 732 my $self = shift;
169 800         845 my ($hid) = @_;
170              
171 800         939 my $tempdir = $self->{+TEMPDIR};
172              
173 800 50       22017 opendir(my $dh, $tempdir) or $self->abort("could not open IPC temp dir ($tempdir)!");
174              
175 800         878 my @out;
176 800         12394 my @files = sort readdir($dh);
177 800         1730 for my $file (@files) {
178 3005 100       7369 next if $file =~ m/^\.+$/;
179 1405 100       22548 next unless $file =~ m/^(\Q$hid\E|GLOBAL)-.*\.ready$/;
180 55         194 my $global = $1 eq 'GLOBAL';
181 55 100 100     262 next if $global && $self->globals->{$file}++;
182              
183             # Untaint the path.
184 10         97 my $full = File::Spec->canonpath("$tempdir/$file");
185 10         42 ($full) = ($full =~ m/^(.*)$/gs);
186              
187 10         38 my $obj = $self->read_event_file($full);
188              
189             # Do not remove global events
190 10 100       25 unless ($global) {
191 8         40 my $complete = File::Spec->canonpath("$full.complete");
192 8 100       30 if ($ENV{TS_KEEP_TEMPDIR}) {
193 1 50       33 rename($full, $complete) or $self->abort("Could not rename IPC file '$full', '$complete'");
194             }
195             else {
196 7 50       488 unlink($full) or $self->abort("Could not unlink IPC file: $file");
197             }
198             }
199              
200 10         25 push @out => $obj;
201             }
202              
203 800         5872 closedir($dh);
204 800         4941 return @out;
205             }
206              
207             sub read_event_file {
208 14     14 0 38 my $self = shift;
209 14         20 my ($file) = @_;
210              
211 14         48 my $obj = Storable::retrieve($file);
212 14 100       956 $self->abort("Got an unblessed object: '$obj'")
213             unless blessed($obj);
214              
215 13 100       79 unless ($obj->isa('Test::Stream::Event')) {
216 3         7 my $pkg = blessed($obj);
217 3         9 my $mod_file = pkg_to_file($pkg);
218 3     3   14 my ($ok, $err) = try { require $mod_file };
  3         538  
219              
220 3 100       17 $self->abort("Event has unknown type ($pkg), tried to load '$mod_file' but failed: $err")
221             unless $ok;
222              
223 2 100       14 $self->abort("'$obj' is not a 'Test::Stream::Event' object")
224             unless $obj->isa('Test::Stream::Event');
225             }
226              
227 11         22 return $obj;
228             }
229              
230             sub waiting {
231 102     102 1 170 my $self = shift;
232 102         695 require Test::Stream::Event::Waiting;
233 102         897 $self->send(
234             GLOBAL => Test::Stream::Event::Waiting->new(
235             debug => Test::Stream::DebugInfo->new(frame => [caller()]),
236             )
237             );
238 102         957 return;
239             }
240              
241             sub DESTROY {
242 207     207   634 my $self = shift;
243              
244 207 100       665 return unless defined $self->pid;
245 206 100       1130 return unless defined $self->tid;
246              
247 205 100       932 return unless $$ == $self->pid;
248 203 100       901 return unless get_tid() == $self->tid;
249              
250 202         726 my $tempdir = $self->{+TEMPDIR};
251              
252 202 50       3953 opendir(my $dh, $tempdir) or $self->abort("Could not open temp dir! ($tempdir)");
253 202         1604 while(my $file = readdir($dh)) {
254 443 100       2204 next if $file =~ m/^\.+$/;
255 41 100       109 next if $file =~ m/\.complete$/;
256 39         255 my $full = File::Spec->canonpath("$tempdir/$file");
257              
258 39 100       168 if ($file =~ m/^(GLOBAL|HUB-)/) {
259 38         126 $full =~ m/^(.*)$/;
260 38         80 $full = $1; # Untaint it
261 38 100       95 next if $ENV{TS_KEEP_TEMPDIR};
262 37 50       1640 unlink($full) or $self->abort("Could not unlink IPC file: $full");
263 37         179 next;
264             }
265              
266 1         5 $self->abort("Leftover files in the directory ($full)!\n");
267             }
268 201         975 closedir($dh);
269              
270 201 100       449 if ($ENV{TS_KEEP_TEMPDIR}) {
271 1         5 print STDERR "# Not removing temp dir: $tempdir\n";
272 1         8 return;
273             }
274              
275 200 50       14357 rmdir($tempdir) or warn "Could not remove IPC temp dir ($tempdir)";
276             }
277              
278             1;
279              
280             __END__