File Coverage

blib/lib/Sys/Bprsync/Worker.pm
Criterion Covered Total %
statement 11 13 84.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 16 18 88.8


line stmt bran cond sub pod time code
1             package Sys::Bprsync::Worker;
2             {
3             $Sys::Bprsync::Worker::VERSION = '0.25';
4             }
5             BEGIN {
6 1     1   2033 $Sys::Bprsync::Worker::AUTHORITY = 'cpan:TEX';
7             }
8             # ABSTRACT: bprsync worker, does all the work
9              
10 1     1   23 use 5.010_000;
  1         4  
  1         39  
11 1     1   6 use mro 'c3';
  1         3  
  1         5  
12 1     1   24 use feature ':5.10';
  1         2  
  1         80  
13              
14 1     1   526 use Moose;
  0            
  0            
15             use namespace::autoclean;
16              
17             # use IO::Handle;
18             # use autodie;
19             # use MooseX::Params::Validate;
20             use English qw( -no_match_vars );
21              
22             use Carp;
23             use File::Blarf;
24              
25             use Sys::Run;
26              
27             extends 'Job::Manager::Worker';
28              
29             has 'parent' => (
30             'is' => 'ro',
31             'isa' => 'Sys::Bprsync',
32             'required' => 1,
33             );
34              
35             has 'sys' => (
36             'is' => 'rw',
37             'isa' => 'Sys::Run',
38             'lazy' => 1,
39             'builder' => '_init_sys',
40             );
41              
42             has 'name' => (
43             'is' => 'ro',
44             'isa' => 'Str',
45             'required' => 1,
46             );
47              
48             has '_job_prefix' => (
49             'is' => 'ro',
50             'isa' => 'Str',
51             'lazy' => 1,
52             'builder' => '_init_job_prefix',
53             );
54              
55             sub _init_job_prefix {
56             return 'Jobs';
57             }
58              
59             # ArrayRef[Str] - not required
60             foreach my $key (qw(execpre execpost exclude)) {
61             has $key => (
62             'is' => 'ro',
63             'isa' => 'ArrayRef[Str]',
64             'required' => 0,
65             'default' => sub { [] },
66             );
67             }
68              
69             # Str - not required
70             foreach my $key (qw(description source destination timeframe excludefrom options rsh rshopts)) {
71             has $key => (
72             'is' => 'ro',
73             'isa' => 'Str',
74             'required' => 0,
75             'clearer' => 'clear_'.$key,
76             'predicate' => 'has_'.$key,
77             );
78             }
79              
80             # Bool - not required - default 0
81             foreach my $key (qw(compression numericids verbose delete nocrossfs hardlink dry sudo)) {
82             has $key => (
83             'is' => 'ro',
84             'isa' => 'Bool',
85             'required' => 0,
86             'clearer' => 'clear_'.$key,
87             'predicate' => 'has_'.$key,
88             );
89             }
90              
91             # Int - not required
92             foreach my $key (qw(bwlimit)) {
93             has $key => (
94             'is' => 'ro',
95             'isa' => 'Int',
96             'required' => 0,
97             'clearer' => 'clear_'.$key,
98             'predicate' => 'has_'.$key,
99             );
100             }
101             has 'runloops' => (
102             'is' => 'ro',
103             'isa' => 'Int',
104             'default' => 3,
105             );
106              
107             has 'loop_status' => (
108             'is' => 'ro',
109             'isa' => 'HashRef',
110             'default' => sub { {} },
111             );
112              
113             has 'logfile' => (
114             'is' => 'ro',
115             'isa' => 'Str',
116             'lazy' => 1,
117             'builder' => '_init_logfile',
118             );
119              
120             has '_init_done' => (
121             'is' => 'rw',
122             'isa' => 'Bool',
123             'default' => 0,
124             );
125              
126             sub _init_sys {
127             my $self = shift;
128              
129             return $self->parent()->sys();
130             }
131              
132             sub _init {
133             my $self = shift;
134              
135             return 1 if $self->_init_done();
136              
137             # ok, now we have a config and a job name, we should be able to
138             # get everything else from the config ...
139             # scalars ...
140             my $common_config_prefix = $self->parent()->config_prefix() . q{::} . $self->_job_prefix() . q{::} . $self->name() . q{::};
141             foreach my $key (qw(description timeframe excludefrom rsh rshopts compression options delete numericids bwlimit source destination nocrossfs hardlink sudo)) {
142             my $predicate = 'has_'.$key;
143             if ( !$self->$predicate() ) {
144             my $config_key = $common_config_prefix . $key;
145             my $val = $self->parent()->config()->get($config_key);
146             if ( defined($val) ) {
147             $self->parent()->logger()->log( message => 'Set '.$key.' ('.$config_key.') for job ' . $self->name() . ' to '.$val, level => 'debug', );
148             $self->{$key} = $val;
149             }
150             else {
151             my $msg = 'Recommended configuration key '.$key.' ('.$config_key.') not found!';
152             $self->parent()->logger()->log( message => $msg, level => 'debug', );
153             }
154             }
155             else {
156             $self->parent()->logger()->log( message => "Configration key $key is " . $self->{$key}, level => 'debug', );
157             }
158             }
159              
160             # arrays ...
161             foreach my $key (qw(execpre execpost exclude)) {
162             if ( !defined( $self->{$key} ) || ref( $self->{$key} ) ne 'ARRAY' || scalar( @{ $self->{$key} } ) < 1 ) {
163             my $config_key = $common_config_prefix . $key;
164             my @vals = $self->parent()->config()->get_array($config_key);
165             if (@vals) {
166             $self->parent()->logger()->log( message => 'Set '.$key.' ('.$config_key.') for job ' . $self->name() . ' to ' . join( q{:}, @vals ), level => 'debug', );
167             $self->{$key} = [@vals] if @vals;
168             }
169             }
170             else {
171             $self->parent()->logger()->log( message => 'Configration key '.$key.' is ' . $self->{$key}, level => 'debug', );
172             }
173             }
174              
175             if ( !$self->source() || !$self->destination() ) {
176             croak('Missing source or destination!');
177             }
178              
179             $self->_init_done(1);
180              
181             return 1;
182             }
183              
184             sub _init_logfile {
185             my $self = shift;
186              
187             return $self->parent()->logfile() . '.rsync.' . $PID;
188             }
189              
190             sub run {
191             my $self = shift;
192              
193             $self->_init();
194             $self->_prepare();
195             if ( !$self->_exec_pre() ) {
196             $self->logger()->log( message => 'Exec-Pre failed', level => 'error', );
197             return;
198             }
199             my $status = $self->_mainloop();
200             $self->_cleanup($status);
201             $self->_exec_post();
202             return $status;
203             }
204              
205             sub _prepare {
206             my $self = shift;
207              
208             # Nothing to do.
209             return 1;
210             }
211              
212             sub _cleanup {
213             my $self = shift;
214             my $status = shift;
215              
216             # cleanup logfile
217             if ( -e $self->logfile() ) {
218             my $target = $self->parent()->logfile() . '.rsync';
219             if ( File::Blarf::cat( $self->logfile(), $target, { Flock => 1, Append => 1, } ) ) {
220             $self->logger()->log( message => 'Appended temporary logfile (' . $self->logfile() . ') to '.$target, level => 'debug', );
221             my $cmd = 'rm -f '.$self->logfile();
222             if($self->sys()->run_cmd($cmd)) {
223             $self->logger()->log( message => 'Removed temporary logfile: '.$self->logfile(), level => 'debug', );
224             } else {
225             $self->logger()->log( message => 'Failed to remove temporary logfile: '.$self->logfile(), level => 'warning', );
226             }
227             }
228             else {
229             $self->logger()->log( message => 'Failed to append temporary logfile (' . $self->logfile() . ') to '.$target, level => 'warning', );
230             }
231             }
232             else {
233             $self->logger()->log( message => 'No temporary logfile found at ' . $self->logfile(), level => 'notice', );
234             }
235              
236             # Nothing to do.
237             return 1;
238             }
239              
240             sub _exec_pre {
241             my $self = shift;
242              
243             my $ok = 1;
244             foreach my $cmd ( @{ $self->execpre() } ) {
245             if ( $self->sys()->run_cmd($cmd) ) {
246             $self->logger()->log( message => 'Executed CMD '.$cmd.' w/ success.', level => 'debug', );
247             }
248             else {
249             $self->logger()->log( message => 'Could not execute CMD '.$cmd.' w/o error.', level => 'error', );
250             $ok = 0;
251             }
252             }
253             return $ok;
254             }
255              
256             sub _exec_post {
257             my $self = shift;
258              
259             foreach my $cmd ( @{ $self->execpost() } ) {
260             if ( $self->sys()->run_cmd($cmd) ) {
261             $self->logger()->log( message => 'Executed CMD '.$cmd.' w/ success.', level => 'debug', );
262             }
263             else {
264             $self->logger()->log( message => 'Could not execute CMD '.$cmd.' w/o error.', level => 'error', );
265             }
266             }
267              
268             return 1;
269             }
270              
271             sub _rsync_cmd {
272             my $self = shift;
273              
274             my $cmd = q{};
275             $cmd .= $self->parent()->get_cmd_prefix();
276             $cmd .= '/usr/bin/rsync';
277              
278             my $rsyncd_mode = 0;
279             if ( $self->source() =~ m/::/ || $self->destination() =~ m/::/ || ( $self->rsh() && $self->rsh() =~ m/rsyncd/ ) ) {
280             $rsyncd_mode = 1;
281             }
282              
283             my $opts = q{};
284             if ( $self->excludefrom() ) {
285             $opts .= " --exclude-from=" . $self->excludefrom();
286             }
287             if ( $self->exclude() ) {
288             my @excludes = @{ $self->exclude() };
289             if (@excludes) {
290             $opts .= ' --exclude="';
291             $opts .= join( '" --exclude="', @excludes );
292             $opts .= q{"};
293             }
294             }
295             if ($rsyncd_mode) {
296             $self->logger()->log( message => 'Skipping rsh handling, using rsyncd mode.', level => 'debug', );
297             if ( $self->rshopts() ) {
298              
299             # for e.g. password-file
300             $opts .= q{ } . $self->rshopts();
301             }
302             } else { # ssh mode
303             if ( $self->rsh() ) {
304             $opts .= ' -e "' . $self->rsh();
305             if ( $self->rsh() eq 'ssh' ) {
306             $opts .= $self->sys()->_ssh_opts();
307             }
308             }
309             else {
310             $opts .= ' -e "ssh '.$self->sys()->_ssh_opts();
311             }
312             if ( $self->rshopts() ) {
313             $opts .= q{ } . $self->rshopts();
314             }
315             $opts .= q{"}; # finish args to -e (remote shell)
316             }
317             $opts .= ' -a'; # always set archive mode
318             if ( $self->hardlink() ) {
319             $opts .= ' -H';
320             }
321             if ( $self->nocrossfs() ) {
322             $opts .= ' -x';
323             }
324             if ( $self->verbose() ) {
325             $opts .= ' -v';
326             }
327             if ( $self->compression() ) {
328             $opts .= ' -z';
329             }
330             if ( $self->options() ) {
331             $self->parent()->logger()->log(
332             message => q{DEPRECATION WARNING: The use of 'options' is deprecated! Please don't use it anymore! There are individual options now.},
333             level => 'warning'
334             );
335              
336             # don't prepend '-' if already present
337             if ( $self->options() =~ m/^\s*-/ ) {
338             $opts .= q{ } . $self->options();
339             }
340             else {
341             $opts .= q{ -} . $self->options();
342             }
343             }
344             if ( $self->numericids() ) {
345             $opts .= ' --numeric-ids';
346             }
347             if ( $self->bwlimit() ) {
348             $opts .= ' --bwlimit=' . $self->bwlimit();
349             }
350             if ( $self->delete() ) {
351             $opts .= ' --delete';
352             }
353             if ( $self->dry() ) {
354             $opts .= ' --dry-run';
355             }
356              
357             my $dirs = q{};
358             $dirs .= q{ } . $self->source();
359             $dirs .= q{ } . $self->destination();
360              
361             my @cmd = ( $cmd, $opts, $dirs );
362              
363             return wantarray ? @cmd : join( q{}, @cmd );
364             }
365              
366             sub _mainloop {
367             my $self = shift;
368             my %status = ();
369              
370             my $cmd = $self->_rsync_cmd();
371              
372             foreach my $runloop ( 1 .. $self->runloops() ) {
373             last if ( !$self->_check_timeframe() );
374             $self->parent()->logger()->log(
375             message => 'Job: [' . $self->name() . '] ' . $self->description . ' (Runloop: '.$runloop.q{/} . $self->runloops() . ') starting ...',
376             level => 'debug'
377             );
378              
379             $self->parent()->logger()->log( message => 'Starting ' . $self->description . q{ - } . $cmd, level => 'debug', );
380             $self->parent()->logger()->log( message => 'CMD: '.$cmd, level => 'debug', );
381             $self->loop_status()->{$runloop}->{'time_start'} = time();
382              
383             my $opts = {
384             'Logfile' => $self->logfile(),
385             'ReturnRV' => 1,
386             'Timeout' => 60 * 60 * 23, # 23h
387             };
388              
389             my $rv;
390             if ( $self->parent()->config()->get( $self->parent()->config_prefix() . '::Dry' ) ) {
391             $rv = 0;
392             }
393             else {
394             $rv = $self->sys()->run_cmd( $cmd, $opts );
395             }
396              
397             my $reason = q{};
398             my $severity = 'debug';
399             if ( $self->parent()->rsync_codes()->{$rv} ) {
400             if ( $self->parent()->rsync_codes()->{$rv}[0] ) {
401             $severity = $self->parent()->rsync_codes()->{$rv}[0];
402             }
403             if ( $self->parent()->rsync_codes()->{$rv}[1] ) {
404             $reason = $self->parent()->rsync_codes()->{$rv}[1];
405             }
406             }
407             $severity ||= 'debug';
408             $self->parent()->logger()->log( message => 'Command finished with RV '.$rv.'. Reason: '.$reason, level => $severity, );
409             $self->loop_status()->{$runloop}->{'rv'} = $rv;
410             $self->loop_status()->{$runloop}->{'reason'} = $reason;
411             $self->loop_status()->{$runloop}->{'severity'} = $severity;
412             $self->loop_status()->{$runloop}->{'time_finish'} = time();
413              
414             # end loop if fatal or no error, otherwise loop again
415             if ( $self->parent()->rsync_codes()->{$rv}[0] eq 'fatal' ) {
416             $self->logger()
417             ->log( message => 'Exiting mainloop after runloop ' . $runloop . ' of ' . $self->runloops() . ' due to: a FATAL error', level => 'error', );
418             return;
419             }
420             elsif ( $self->parent()->rsync_codes()->{$rv}[0] ne 'error' ) {
421             $self->logger()
422             ->log( message => 'Exiting mainloop after runloop ' . $runloop . ' of ' . $self->runloops() . ' due to: SUCCESS', level => 'debug', );
423             return 1;
424             }
425             }
426              
427             $self->logger()->log(
428             message => 'Exiting mainloop after runloop ' . $self->runloops() . ' of ' . $self->runloops() . ' due to: no more runloops left',
429             level => 'debug',
430             );
431             return 1;
432             }
433              
434             sub _check_timeframe {
435             my $self = shift;
436              
437             ## no critic (ProhibitExcessComplexity)
438             my ( $from_hour, $from_min, $to_hour, $to_min ) = ( 0, 0, 0, 0 );
439             if ( $self->timeframe()
440             && $self->timeframe() =~ m/0?(\d?\d):0?(\d?\d)-0?(\d?\d):0?(\d?\d)/ )
441             {
442             $from_hour = $1;
443             $from_min = $2;
444             $to_hour = $3;
445             $to_min = $4;
446             my $now_min = ( localtime() )[1];
447             my $now_hour = ( localtime() )[2];
448             my $now_mday = ( localtime() )[3];
449             my $now_mon = ( localtime() )[4];
450             my $now_year = ( localtime() )[5];
451              
452             # Check if this job may run now
453             if (
454             (
455             (
456              
457             # from < to
458             ( $from_hour < $to_hour || ( $from_hour == $to_hour && $from_min < $to_min ) )
459             && (
460             ## now < from
461             ( $now_hour < $from_hour || ( $now_hour == $from_hour && $now_min < $from_min ) )
462             ## now > to
463             || ( $now_hour > $to_hour || ( $now_hour == $to_hour && $now_min > $to_min ) )
464             )
465             )
466             || (
467              
468             # from > to
469             ( $from_hour > $to_hour || ( $from_hour == $to_hour && $from_min > $to_min ) )
470             &&
471             ## now > to && now < from
472             ( $now_hour > $to_hour || ( $now_hour == $to_hour && $now_min > $to_min ) )
473             && ( $now_hour < $from_hour || ( $now_hour == $from_hour && $now_min < $from_min ) )
474             )
475             )
476             )
477             {
478             $self->parent()->logger()->log(
479             message => 'Skipping Job: '
480             . $self->description()
481             . ' because not within timeframe (time: '
482             . $now_hour . q{:}
483             . $now_min
484             . ', from: '
485             . $from_hour . q{:}
486             . $from_min
487             . ', to: '
488             . $to_hour . q{:}
489             . $to_min,
490             level => 'debug'
491             );
492             return;
493             }
494             }
495             ## use critic
496             return 1;
497             }
498              
499             no Moose;
500             __PACKAGE__->meta->make_immutable;
501              
502             1;
503              
504             __END__
505              
506             =pod
507              
508             =encoding UTF-8
509              
510             =head1 NAME
511              
512             Sys::Bprsync::Worker - bprsync worker, does all the work
513              
514             =head1 METHODS
515              
516             =head2 run
517              
518             Run a sync job.
519              
520             =head1 NAME
521              
522             Sys::Bprsync::Worker - a BPrsync worker
523              
524             =head1 AUTHOR
525              
526             Dominik Schulz <dominik.schulz@gauner.org>
527              
528             =head1 COPYRIGHT AND LICENSE
529              
530             This software is copyright (c) 2012 by Dominik Schulz.
531              
532             This is free software; you can redistribute it and/or modify it under
533             the same terms as the Perl 5 programming language system itself.
534              
535             =cut