File Coverage

blib/lib/MMM/MirrorTask.pm
Criterion Covered Total %
statement 24 271 8.8
branch 0 126 0.0
condition 0 37 0.0
subroutine 8 33 24.2
pod 15 15 100.0
total 47 482 9.7


line stmt bran cond sub pod time code
1             package MMM::MirrorTask;
2              
3             =head1 NAME
4              
5             MMM::MirrorTask Class to store mirror task function and data
6              
7             =cut
8              
9 3     3   39 use strict;
  3         7  
  3         123  
10 3     3   18 use warnings;
  3         8  
  3         94  
11 3     3   2135 use MMM::Sync;
  3         11  
  3         97  
12 3     3   2528 use MMM::Utils;
  3         12  
  3         230  
13 3     3   1495 use MMM::Config;
  3         8  
  3         320  
14 3     3   1179 use MMM::Mirror;
  3         6  
  3         162  
15 3     3   19 use Fcntl qw(:flock);
  3         219  
  3         651  
16 3     3   19 use Digest::MD5;
  3         6  
  3         18222  
17              
18             =head1 METHODS
19              
20             =head2 new
21              
22             =cut
23              
24             sub new {
25 0     0 1   my ($class, $mmm, $name, %options) = @_;
26 0           bless(
27             {
28             mmm => $mmm,
29             name => $name,
30             options => { %options },
31             lockcount => 0,
32             },
33             $class
34             );
35             }
36              
37             sub DESTROY {
38 0     0     my ($self) = @_;
39 0           $self->{lockcount} = 1; # Force unlock, can't happen
40 0           $self->unlock();
41             }
42              
43             =head2 name
44              
45             Return the name of the current task
46              
47             =cut
48              
49             sub name {
50 0     0 1   $_[0]->{name}
51             }
52              
53             =head2 is_disable
54              
55             Return true if the current task is disable
56              
57             =cut
58              
59 0     0 1   sub is_disable { yes_no($_[0]->val('disable')) }
60              
61             =head2 val( $var, $default)
62              
63             Return the configuration value for $var. Return $default if parameter
64             is not set in the config
65              
66             =cut
67              
68             sub val {
69 0     0 1   my ( $self, $var, $default ) = @_;
70 0           $self->{mmm}->{config}->val( $self->name, $var, $default );
71             }
72              
73             =head2 frequency
74              
75             Return the frequency value from config
76              
77             =cut
78              
79             sub frequency {
80 0     0 1   my ($self) = @_;
81 0           duration2m($self->val( 'period', PERIOD ));
82             }
83              
84             sub _set_status_time {
85 0     0     my ($config, $section, $var, $val) = @_;
86 0           $config->newval($section, $var, $val);
87 0           $config->SetParameterComment(
88             $section, $var, scalar(gmtime($val))
89             );
90             }
91              
92             =head2 state_info
93              
94             Return a hashref about job status
95              
96             =cut
97              
98             sub state_info {
99 0     0 1   my ($self, $status) = @_;
100 0   0       $status ||= $self->_get_status();
101 0           my %info = ();
102 0           foreach my $section (qw(job success failure)) {
103 0           foreach my $var ($status->Parameters($section)) {
104 0           $info{$section}{$var} = $status->val($section, $var);
105             }
106             }
107 0 0         if (yes_no($self->val('compute_size', 0))) {
108 0           $info{job}{size} = $status->val('job', 'size');
109             } else {
110 0           delete($info{job}{size});
111             }
112              
113 0           $info{job}{error_log} = [ grep { $_ } ($status->val('job', 'error_log')) ];
  0            
114 0           $info{job}{next_run_time} = $self->next_run_time($status);
115 0           $info{job}{is_running} = $self->is_running;
116 0           return %info;
117             }
118              
119             sub _compute_config_sum {
120 0     0     my ($self) = @_;
121 0           my $md5 = Digest::MD5->new();
122 0           foreach (qw(url source path)) {
123 0           $md5->add("$_=");
124 0           $md5->add(join("\n", $self->val('job', $_, '')));
125             }
126 0           return $md5->hexdigest
127             }
128              
129             sub _set_compute_config_sum {
130 0     0     my ($self, $status) = @_;
131 0   0       $status ||= $self->_get_status();
132 0           my $newsum = $self->_compute_config_sum();
133 0 0         if ($status->val('job', 'config_sum', '') ne $newsum) {
134 0           $status->newval('job', 'config_sum', $newsum);
135 0           return 1;
136             } else {
137 0           return 0;
138             }
139             }
140              
141             =head2 next_run_time
142              
143             Return the time (in second) when the next run should be performed
144              
145             =cut
146              
147             sub next_run_time {
148 0     0 1   my ($self, $status) = @_;
149 0           my @alltime = ( scalar( time() ) );
150 0   0       $status ||= $self->_get_status();
151 0           my $last_start = 0;
152 0 0         if ($self->_compute_config_sum() ne $status->val('job', 'config_sum', '')) {
153 0 0         $self->_log('INFO',
154             'Config for has changed, need to be run immediately')
155             if($status->val('job', 'start', 0));
156             } else {
157 0           $last_start = $status->val( 'job' , 'start', 0 );
158             }
159              
160 0 0         if ( $last_start ) {
161 0           push( @alltime, $last_start + ( $self->frequency * 60 ) );
162             }
163              
164 0 0 0       if ( $self->val('waitafter', WAITAFTER_MINIMA)
165             && $status->val( 'job' , 'end' ) )
166             {
167 0           push( @alltime,
168             $status->val( 'job', 'end' ) +
169             $self->val('waitafter', WAITAFTER_MINIMA) * 60 );
170             }
171              
172 0 0 0       if ( $self->val('waitaftersuccess')
173             && $status->val( 'success', 'end' ) )
174             {
175 0           push( @alltime,
176             $status->val( 'job', 'success' ) +
177             $self->val('waitaftersuccess') * 60 );
178             }
179              
180 0           my ($t) = sort { $b <=> $a } @alltime;
  0            
181             # $self->_log('DEBUG', 'Next run time for %s is %d (in %d), frequency is %d',
182             # $self->{name}, $t, $t - scalar(time()),
183             # $self->frequency,
184             #);
185              
186 0           $t;
187             }
188              
189             =head2 source
190              
191             Return the source associate to the list, if any
192              
193             =cut
194              
195             sub source {
196 0 0   0 1   $_[0]->val('source') || '';
197             }
198              
199             sub _log {
200 0     0     my ($self, $level, $message, @args) = @_;
201 0           $self->{mmm}->log(
202             $level,
203             sprintf('[%s] %s', $self->name, $message),
204             @args
205             );
206             }
207              
208             sub _lockpath {
209 0     0     my ($self) = @_;
210 0           my $lockfile = $self->name;
211 0           $lockfile =~ s:/:_:g;
212 0           join('/', ($self->{mmm}->statedir, "/$lockfile.lck"));
213             }
214              
215             sub _statusfile {
216 0     0     my ($self) = @_;
217 0           my $lockfile = $self->name;
218 0           $lockfile =~ s:/:_:g;
219 0           join('/', ($self->{mmm}->statedir, $lockfile));
220             }
221              
222             =head2 getlock($share)
223              
224             Try to lock the lockfile for this task, in shared mode if
225             $share is set.
226              
227             =cut
228              
229             sub getlock {
230 0     0 1   my ($self, $share) = @_;
231 0 0         if ($self->{lockcount}) {
232 0           $self->{lockcount}++;
233 0           $self->_log(
234             'DEBUG',
235             'Lock is already done, counter is now %d', $self->{lockcount}
236             );
237 0           return $self->{lockcount};
238             }
239 0           $self->_log( 'DEBUG', 'Trying to acquire lock' );
240 0           $self->{lockfile} = $self->_lockpath;
241 0 0         if ( open( $self->{lockfh}, $share ? '<' : '>', $self->_lockpath) ) {
    0          
242 0 0         if ( !flock(
    0          
243             $self->{lockfh}, LOCK_NB | ( $share ? LOCK_SH : LOCK_EX ) ) ) {
244 0 0         if ( ( $! + 0 ) != 11 ) { # E_AGAIN, does this is really need
245 0           $self->_log(
246             'FATAL',
247             "Cannot lock file %s",
248             $self->_lockpath
249             );
250 0           unlink( $self->_lockpath );
251 0           close( $self->{lockfh} );
252 0           return;
253             }
254 0           $self->_log( 'DEBUG', 'is already lock' );
255 0           return;
256             }
257 0           my $fh = $self->{lockfh};
258 0 0         print $fh "$$\n" unless($share);
259             }
260             else {
261 0           $self->_log( 'FATAL',
262             'Cannot open lock file %s :%s', $self->{lockfile}, $!);
263 0           return;
264             }
265 0           ++$self->{lockcount};
266             }
267              
268             =head2 unlock
269              
270             Release the lock for the task
271              
272             =cut
273              
274             sub unlock {
275 0     0 1   my ($self) = @_;
276 0 0         $self->{lockfh} or return;
277 0 0         --$self->{lockcount} and return;
278 0           unlink( $self->{lockfile} );
279 0           close( $self->{lockfh} );
280             }
281              
282             =head2 is_running
283              
284             Return true is the task is running (lock check)
285              
286             =cut
287              
288             sub is_running {
289 0     0 1   my ($self) = @_;
290 0           my @stat = stat($self->_lockpath);
291 0 0         if ($self->{mmm}->_task_is_registred($self->name)) {
292 0   0       return $stat[9] || scalar(time);
293             }
294              
295 0 0         if (!defined($stat[9])) { return }
  0            
296             else {
297 0           my $res = $self->getlock(1);
298 0 0         if ($res) {
299 0           $self->unlock;
300 0 0         if ($res > 1) { return $stat[9]; }
  0            
301 0           else { return; }
302 0           } else { return $stat[9]; }
303             }
304             }
305              
306             sub _get_status {
307 0     0     my ($self) = @_;
308 0 0         Config::IniFiles->new(
    0          
309             -f $self->_statusfile ? ( -file => $self->_statusfile ) : ()
310             ) || Config::IniFiles->new();
311             }
312              
313             sub _write_status {
314 0     0     my ($self, $status) = @_;
315 0           $self->_log( 'DEBUG', 'Write status file: %s', $self->_statusfile );
316 0           $status->WriteConfig( $self->_statusfile );
317             }
318              
319             =head2 failure_count
320              
321             Return three values:
322             - the count of failure since last success
323             - the previous failure count
324             - and if this count has change between the two previous run
325             (eg if failure count is different of previous failure count)
326              
327             =cut
328              
329             sub failure_count {
330 0     0 1   my ($self, $status) = @_;
331 0 0 0       my $before = defined($self->{successive_failure_before})
332             ? $self->{successive_failure_before}
333             : ($status ||= $self->_get_status())->val('job', 'old_failure_count', 0);
334 0 0 0       my $after = defined($self->{successive_failure_after})
335             ? $self->{successive_failure_after}
336             : ($status ||= $self->_get_status())->val('job', 'successive_failure_count', 0);
337             return(
338 0 0         $before, $after, defined($after) ? $after != $before : undef
339             );
340             }
341              
342             =head2 sync
343              
344             Perform the synchronization
345              
346             =cut
347              
348             sub sync {
349 0     0 1   my ($self) = @_;
350 0           $self->_log('INFO', 'Start to process' );
351 0 0         $self->_log('DEBUG', 'goes into %s%s',
352             $self->dest,
353             $self->{options}{dryrun} ? ' (dryrun mode)' : '',
354             );
355              
356 0           my $oldname = $0;
357 0           $0 = 'mmm [' . $self->name . ']';
358            
359 0 0         $self->getlock() or return;
360              
361 0           my $status = $self->_get_status();
362 0           $self->_set_compute_config_sum($status);
363 0           my ($ouid, $ogid) = MMM::Utils::setid( $self->val('user'), $self->val('group') );
364              
365 0           $self->{successive_failure_before} =
366             $status->val('job', 'successive_failure_count', 0);
367 0           $status->newval('job', 'old_failure_count', $self->{successive_failure_before});
368 0           $status->delval( 'job', 'command');
369 0           $status->newval('job', 'processed_count',
370             $status->val('job', 'processed_count', 0) + 1
371             );
372              
373 0 0         if (!defined($status->val('job', 'first_sync'))) {
374 0           _set_status_time($status, 'job', 'first_sync', scalar( time() ));
375             }
376              
377 0           my $res = 0;
378              
379 0 0         if (! -d $self->dest) {
380 0           push(@{ $self->{output} }, sprintf('Directory %s does not exists (%s)',
  0            
381             $self->dest,
382             $self->name,
383             ));
384 0           foreach (qw(start end)) {
385 0           _set_status_time($status, 'job', $_, scalar( time() ));
386             }
387 0           return $res;
388             }
389              
390 0 0         if ($self->val('pre_exec')) {
391 0           my @cmd = ($self->val('pre_exec'), $self->name, $self->dest);
392 0           $self->_log('INFO', 'Executing PRE: %s',
393 0           join(' ', map { qq{"$_"} } (@cmd)));
394 0 0         if (system(@cmd) != 0) {
395 0 0         if ($? == -1) {
396 0           $self->_log('ERROR', 'failed to execute pre_exec: %s', $!);
397             } else {
398 0           $self->_log('ERROR',
399             'Pre_exec exited with value %d, abborting sync', $? >> 8);
400             }
401 0           return $res;
402             }
403             }
404              
405 0 0         if (my $url = $self->val('url')) {
406 0   0       $res = $self->_sync_url(
407             $status, $url,
408             password => $self->val('password') || undef,
409             use_ssh => yes_no($self->val('rsync_use_ssh')),
410             );
411             } else {
412 0           $self->_log('ERROR', 'No source or url' );
413 0           return $res
414             }
415              
416 0 0         $status->newval('job', 'success', $res ? 1 : 0);
417 0 0         if ($res) {
418 0 0         $self->_log('NOTICE', 'Sync done%s from %s',
419             $self->{options}{dryrun} ? ' (dryrun mode)' : '',
420             $status->val('success', 'sync_from'),
421             );
422 0           $status->newval('job', 'successive_failure_count', 0);
423             } else {
424 0           $self->_log('WARNING', 'Unable to sync');
425 0           $status->newval('job', 'successive_failure_count',
426             $status->val('job', 'successive_failure_count', 0) + 1
427             );
428 0 0         foreach (@{ $self->{output} ? $self->{output} : [ "No output from process" ]}) {
  0            
429 0           $self->_log('ERROR', $_);
430             }
431             }
432 0           $self->{successive_failure_after} =
433             $status->val('job', 'successive_failure_count', 0);
434              
435 0 0         if ($self->val('post_exec')) {
436 0           $ENV{MMM_RESULT} = $res;
437 0 0         if ($res) {
438 0           $ENV{MMM_FROM} = $status->val('success', 'sync_from');
439 0           $ENV{MMM_MIRROR} = $status->val('job', 'try_from');
440             }
441 0           my @cmd = ($self->val('post_exec'), $self->name, $self->dest);
442 0           $self->_log('INFO', 'Executing POST: %s',
443 0           join(' ', map { qq{"$_"} } (@cmd)));
444 0 0         if (system(@cmd) != 0) {
445 0 0         if ($? == -1) {
446 0           $self->_log('WARNING', 'failed to execute post_exec: %s', $!);
447             } else {
448 0           $self->_log('WARNING',
449             'Post_exec exited with value %d, abborting sync', $? >> 8);
450             }
451             }
452             }
453              
454 0 0 0       if (yes_no($self->val('compute_size', 0)) &&
455             scalar(time) > $status->val('job', 'size_time', 0) +
456             duration2m($self->val('size_delay', SIZE_DELAY) * 60)) {
457 0           $self->du_dest($status);
458             }
459              
460 0           MMM::Utils::setid($ouid, $ogid);
461              
462 0 0         $self->_write_status($status) unless($self->{options}{dryrun});
463              
464 0 0         if ($self->{mmm}->can('send_mail')) {
465 0 0 0       if ($status->val('job', 'old_failure_count', 0) !=
  0            
466             $status->val('job', 'successive_failure_count', 0) &&
467             grep { $status->val('job', 'successive_failure_count', 0) == $_ }
468             (0, $self->val('errors_mail', 3))
469             ) {
470              
471 0           $self->{mmm}->body_queue($self, $self, $self->state_info($status));
472 0           $self->{mmm}->send_mail();
473             }
474             }
475              
476 0           $self->unlock();
477 0           $0 = $oldname;
478 0           $res
479             }
480              
481             sub _sync_url {
482 0     0     my ($self, $status, $based_url, %options) = @_;
483              
484 0           my $url = $based_url;
485 0 0         $url =~ m:/$: or $url .= '/';
486 0 0         $url .= '/' . $self->val('subdir') if ($self->val('subdir'));
487 0 0         $url =~ m:/$: or $url .= '/';
488              
489 0           $self->_log('DEBUG', 'Try from mirror %s', $url);
490 0           _set_status_time($status, 'job', 'start', scalar( time() ));
491              
492 0           foreach my $val (
493             'bwlimit', # bandwidth limit in k
494             'timeout', # timeout
495             'rsync_opts', # specifics rsync options
496             'rsync_defaults', # defaults rsync options
497             'exclude', # excluded files/dir
498             'tempdir',
499             'partialdir',
500             ) {
501 0 0         if (my $v = $self->val($val)) {
502 0           $options{$val} = $v;
503             }
504             }
505 0           foreach my $val (
506             'delete-after', # deleting after ?
507             'delete', # delete removed files ?
508             'delete-excluded', # deleting excluded files ?
509             ) {
510 0           $options{$val} = yes_no($self->val($val));
511             }
512 0           my $sync = MMM::Sync->new(
513             $url,
514             $self->dest,
515             %options,
516             );
517              
518 0 0         if (my $m = MMM::Mirror->new(url => $url)) {
519 0           $status->newval('job', 'try_from', $m->host);
520             }
521              
522 0           my $sync_res;
523 0           my $max_try = $self->val( 'max_try', MAX_TRY );
524 0           $self->_log('DEBUG', 'running %s', join(' ', $sync->buildcmd()));
525 0           foreach my $trycount (1 .. $max_try) {
526 0           $sync->reset_output;
527 0 0         if ( $self->{options}{dryrun} ) {
528 0           $sync_res = 0;
529 0           sleep(10);
530             } else {
531 0           $sync_res = $sync->sync();
532             }
533 0 0         $self->_log($sync_res ? 'WARNING' : 'DEBUG',
534             'Try %d/%d, res: %d from mirror %s',
535             $trycount, $max_try, $sync_res,
536             $sync->{source}, # TODO: kill intrusive var access
537             );
538 0           $self->{output} = $sync->get_output;
539 0 0         if ($sync_res != 1) {
540             last
541 0           }
542             }
543              
544 0           $status->newval( 'job', 'command', join(' ', $sync->buildcmd) );
545              
546 0           _set_status_time($status, 'job', 'end', scalar( time() ) );
547              
548 0 0         my $concerned_section = $sync_res ? 'failure' : 'success';
549              
550 0           foreach (qw(start end)) {
551 0           _set_status_time($status, $concerned_section, $_,
552             $status->val('job', $_)
553             );
554             }
555 0           $status->newval($concerned_section, 'url', $url);
556 0           $status->newval($concerned_section, 'try_from', $status->val('job', 'try_from'));
557              
558 0 0         if ($sync_res == 0) {
559 0           $status->delval( 'job', 'error_log' );
560 0           $status->newval('success', 'sync_from', $status->val('job', 'try_from'));
561             } else {
562 0 0         if (@{ $self->{output} || []}) {
  0 0          
563 0           $status->newval('job', 'error_log',
564 0           (@{ $self->{output}} > 10)
565 0           ? (@{$self->{output}}[-9 .. -1], '...')
566 0 0         : (@{$self->{output}})
567             );
568             }
569 0           return 0;
570             }
571             }
572              
573             =head2 dest
574              
575             The destination directory for this mirror
576              
577             =cut
578              
579             sub dest {
580 0     0 1   my ($self) = @_;
581 0           return $self->val('path', $self->name);
582             }
583              
584             =head2 du_dest
585              
586             Perform du over the destination directory and store the result into status file
587              
588             =cut
589              
590             sub du_dest {
591 0     0 1   my ($self, $status) = @_;
592 0   0       $status ||= $self->_get_status();
593 0           $self->_log('DEBUG', 'Calculating size of %s', $self->dest);
594 0 0         $self->getlock() or return;
595 0 0         if (! -d $self->dest) { return }
  0            
596 0 0         open(my $handle, sprintf('\\du -s %s |', $self->dest)) or return;
597 0           my $line = <$handle>;
598 0 0 0       if ($line && $line =~ /^(\d+)/) {
599 0           $status->newval('job', 'size', $1);
600 0           $status->newval('job', 'size_time', scalar(time));
601             }
602 0           close($handle);
603 0           $self->_write_status($status);
604 0           $self->unlock;
605 0           return 1;
606             }
607              
608             =head1 STATUS FILE STRUCTURE
609              
610             =head2 job SECTION
611              
612             =over 4
613              
614             =item successed 1 if last run was ok
615              
616             =item size The size of the tree
617              
618             =item size_time Last time the size were checked
619              
620             =item first_sync
621              
622             The first time job is take into account
623              
624             =back
625              
626             =head2 success SECTION
627              
628             =item start
629              
630             =item end
631              
632             =head2 failure SECTION
633              
634             =cut
635              
636             1;