File Coverage

blib/lib/Parallel/Scoreboard.pm
Criterion Covered Total %
statement 81 88 92.0
branch 19 36 52.7
condition 4 9 44.4
subroutine 16 18 88.8
pod 4 4 100.0
total 124 155 80.0


line stmt bran cond sub pod time code
1             package Parallel::Scoreboard;
2              
3 2     2   779 use strict;
  2         2  
  2         61  
4 2     2   7 use warnings;
  2         3  
  2         55  
5              
6 2     2   7 use Digest::MD5 qw(md5);
  2         2  
  2         99  
7 2     2   7 use Fcntl qw(:flock);
  2         4  
  2         195  
8 2     2   8 use IO::Handle;
  2         3  
  2         67  
9 2     2   932 use POSIX qw(:fcntl_h);
  2         9356  
  2         12  
10 2     2   2069 use File::Path;
  2         3  
  2         235  
11              
12             our $VERSION = '0.07';
13              
14             use Class::Accessor::Lite (
15 2         11 ro => [ qw(base_dir worker_id) ],
16 2     2   973 );
  2         1848  
17              
18             sub new {
19 2     2 1 11 my $klass = shift;
20 2         3 my %args = @_;
21 2 50       5 die "mandatory parameter:base_dir is missing"
22             unless $args{base_dir};
23             # create base_dir if necessary
24 2 50       17 if (! -e $args{base_dir}) {
25 0 0       0 mkpath $args{base_dir}
26             or die "failed to create directory:$args{base_dir}:$!";
27             }
28             # build object
29             my $self = bless {
30 16     16   409 worker_id => sub { $$ },
31 2         13 %args,
32             }, $klass;
33             # remove my status file, just in case
34 2         6 unlink $self->_build_filename();
35            
36 2         6 return $self;
37             }
38              
39             sub DESTROY {
40 2     2   60 my $self = shift;
41             # if file is open, close and unlink
42 2 50       8 if ($self->{fh}) {
43 2         22 close $self->{fh};
44             # during global destruction we may already have lost this
45 2 100       22 unlink $self->_build_filename() if ($self->{base_dir});
46             }
47             }
48              
49             sub update {
50 2     2 1 13 my ($self, $status) = @_;
51             # open file at the first invocation (tmpfn => lock => rename)
52 2         5 my $id = $self->worker_id->();
53 2 50 33     7 if ($self->{fh} && $self->{id_for_fh} ne $id) {
54             # fork? close but do not unlock
55 0         0 close $self->{fh};
56 0         0 undef $self->{fh};
57             }
58 2 50       6 unless ($self->{fh}) {
59 2         7 my $fn = $self->_build_filename();
60 2 50       128 open my $fh, '>', "$fn.tmp"
61             or die "failed to open file:$fn.tmp:$!";
62 2         22 autoflush $fh 1;
63 2 50       102 flock $fh, LOCK_EX
64             or die "failed to flock file:$fn.tmp:$!";
65 2 50       100 rename "$fn.tmp", $fn
66             or die "failed to rename file:$fn.tmp to $fn:$!";
67 2         4 $self->{fh} = $fh;
68 2         9 $self->{id_for_fh} = $id;
69             }
70             # write to file with size of the status and its checksum
71 2 50       16 seek $self->{fh}, 0, SEEK_SET
72             or die "seek failed:$!";
73 2         3 print {$self->{fh}} (
  2         149  
74             md5($status),
75             pack("N", length $status),
76             $status,
77             );
78             }
79              
80             sub read_all {
81 3     3 1 2001364 my $self = shift;
82 3         15 my %ret;
83             $self->_for_all(
84             sub {
85 8     8   35 my ($id, $fh) = @_;
86             # detect collision using md5
87 8         19 for (1..10) {
88 8 50       33 seek $fh, 0, SEEK_SET
89             or die "seek failed:$!";
90 8         15 my $data = do { local $/; join '', <$fh> };
  8         40  
  8         236  
91             # silently ignore if data is too short
92 8 50       36 return if length($data) < 16 + 4;
93             # parse input
94 8         22 my $md5 = substr($data, 0, 16);
95 8         45 my $size = unpack("N", substr($data, 16, 4));
96 8         14 my $status = substr($data, 20, $size);
97             # compare md5 to detect collision
98             next
99 8 50       67 if md5($status) ne $md5;
100             # have read correct data, save and return
101 8         33 $ret{$id} = $status;
102 8         33 return;
103             }
104             # failed to read data in 10 consecutive attempts, bug?
105 0         0 warn "failed to read status of id:$id, skipping";
106             }
107 3         73 );
108 3         42 \%ret;
109             }
110              
111             sub cleanup {
112 0     0 1 0 my $self = shift;
113 0     0   0 $self->_for_all(sub {});
  0         0  
114             }
115              
116             sub _for_all {
117 3     3   12 my ($self, $cb) = @_;
118 3         588 my @files = glob "$self->{base_dir}/status_*";
119 3         39 for my $fn (@files) {
120             # obtain id from filename (or else ignore)
121 9 50       132 $fn =~ m|/status_(.*)$|
122             or next;
123 9         41 my $id = $1;
124             # ignore files removed after glob but before open
125 9 50       377 open my $fh, '+<', $fn
126             or next;
127             # check if the file is still opened by the owner process using flock
128 9 100 100     51 if ($id ne $self->worker_id->() && flock $fh, LOCK_EX | LOCK_NB) {
129             # the owner has died, remove status file
130 1         13 close $fh;
131 1 50 0     108 unlink $fn
132             or
133             not $!{ENOENT} and
134             warn "failed to remove an obsolete scoreboard file:$fn:$!";
135 1         9 next;
136             }
137             # invoke
138 8         27 $cb->($id, $fh);
139             # close
140 8         110 close $fh;
141             }
142             }
143              
144             sub _build_filename {
145 5     5   7 my $self = shift;
146 5         25 return "$self->{base_dir}/status_" . $self->worker_id->();
147             }
148              
149             1;
150             __END__