File Coverage

blib/lib/Brackup/GPGProcManager.pm
Criterion Covered Total %
statement 76 79 96.2
branch 17 26 65.3
condition 9 12 75.0
subroutine 15 15 100.0
pod 0 10 0.0
total 117 142 82.3


line stmt bran cond sub pod time code
1             package Brackup::GPGProcManager;
2 13     13   69 use strict;
  13         26  
  13         467  
3 13     13   75 use warnings;
  13         31  
  13         434  
4 13     13   7163 use Brackup::GPGProcess;
  13         44  
  13         433  
5 13     13   86 use POSIX ":sys_wait_h";
  13         28  
  13         78  
6              
7             sub new {
8 3     3 0 10 my ($class, $iter, $target) = @_;
9 3         83 return bless {
10             chunkiter => $iter,
11             procs => {}, # "addr(pchunk)" => GPGProcess
12             target => $target,
13             procs_running => {}, # pid -> GPGProcess
14             uncollected_bytes => 0,
15             uncollected_chunks => 0,
16             }, $class;
17             }
18              
19             sub enc_chunkref_of {
20 33     33 0 76 my ($self, $pchunk) = @_;
21              
22 33         623 my $proc = $self->{procs}{$pchunk};
23 33 100       712 unless ($proc) {
24             # catch iterator up to the point that was
25             # requested, or blow up.
26 3         9 my $found = 0;
27 3         8 my $iters = 0;
28 3         22 while (my $ich = $self->{chunkiter}->next) {
29 3 50       14 if ($ich == $pchunk) {
30 3         6 $found = 1;
31 3         8 last;
32             }
33 0         0 $iters++;
34 0         0 warn "iters = $iters\n";
35             }
36 3 50       11 die "Not found" unless $found;
37 3         34 $proc = $self->gen_process_for($pchunk);
38             }
39              
40 33         1549 while ($proc->running) {
41 13 50       1717 my $pid = $self->wait_for_a_process(1) or die
42             "No processes were reaped!";
43             }
44              
45 33         394 $self->_proc_summary_dump;
46 33         190 my ($cref, $enc_length) = $self->get_proc_chunkref($proc);
47 33         880 $self->_proc_summary_dump;
48 33         154 $self->start_some_processes;
49              
50 33         1129 return ($cref, $enc_length);
51             }
52              
53             sub start_some_processes {
54 33     33 0 57 my $self = shift;
55              
56             # eat up any pending zombies
57 33         170 while ($self->wait_for_a_process(0)) {}
58              
59 33         343 my $pchunk;
60             # TODO: make this stuff configurable/auto-tuned
61 33   66     419 while ($self->num_running_procs < 5 &&
      66        
      66        
62             $self->uncollected_chunks < 20 &&
63             $self->num_uncollected_bytes < 128 * 1024 * 1024 &&
64             ($pchunk = $self->next_chunk_to_encrypt)) {
65 36         210 $self->_proc_summary_dump;
66 36         248 $self->gen_process_for($pchunk);
67 36         1059 $self->_proc_summary_dump;
68             }
69             }
70              
71             sub _proc_summary_dump {
72 138     138   1125 my $self = shift;
73 138 50       653 return unless $ENV{GPG_DEBUG};
74              
75 0         0 printf STDERR "num_running=%d, num_outstanding_bytes=%d uncollected_chunks=%d\n",
76             $self->num_running_procs, $self->num_uncollected_bytes, $self->uncollected_chunks;
77             }
78              
79             sub next_chunk_to_encrypt {
80 57     57 0 503 my $self = shift;
81 57         2444 while (my $ev = $self->{chunkiter}->next) {
82 40 100       806 next if $ev->isa("Brackup::File");
83 36         76 my $pchunk = $ev;
84 36 50       982 next if $self->{target}->stored_chunk_from_inventory($pchunk);
85 36         757 return $pchunk;
86             }
87 21         367 return undef;
88             }
89              
90             sub get_proc_chunkref {
91 33     33 0 102 my ($self, $proc) = @_;
92 33         203 my $cref = $proc->chunkref;
93 33         171 delete $self->{procs}{$proc};
94 33         134 $self->{uncollected_bytes} -= $proc->size_on_disk;
95 33         1773 $self->{uncollected_chunks}--;
96 33         128 return ($cref, $proc->size_on_disk);
97             }
98              
99             # returns PID of a process that finished
100             sub wait_for_a_process {
101 72     72 0 149 my ($self, $block) = @_;
102 72 100       358 my $flags = $block ? 0 : WNOHANG;
103 72         241137 my $kid = waitpid(-1, $flags);
104 72 100 100     991 return 0 if ! $block && $kid <= 0;
105 39 50       194 die "no child?" if $kid < 0;
106 39 50       127 return 0 unless $kid;
107              
108 39 50       267 my $proc = $self->{procs_running}{$kid} or die "Unknown child
109             process $kid finished!\n";
110              
111 39 50       256 delete $self->{procs_running}{$proc->pid} or die;
112 39         182 $proc->note_stopped;
113 39         381 $self->{uncollected_bytes} += $proc->size_on_disk;
114              
115 39         3198 return $kid;
116             }
117              
118 57     57 0 1316 sub num_uncollected_bytes { $_[0]{uncollected_bytes} }
119              
120 57     57 0 675 sub uncollected_chunks { $_[0]{uncollected_chunks} }
121              
122             sub gen_process_for {
123 39     39 0 102 my ($self, $pchunk) = @_;
124 39         726 my $proc = Brackup::GPGProcess->new($pchunk);
125 39         5658 $self->{procs_running}{$proc->pid} = $proc;
126 39         1231 $self->{procs}{$pchunk} = $proc;
127 39         1564 $self->{uncollected_chunks}++;
128 39         638 return $proc;
129             }
130              
131             sub num_running_procs {
132 69     69 0 182 my $self = shift;
133 69         126 return scalar keys %{$self->{procs_running}};
  69         1683  
134             }
135              
136             1;
137