File Coverage

blib/lib/File/Rsync/Mirror/Recentfile.pm
Criterion Covered Total %
statement 778 1008 77.1
branch 293 484 60.5
condition 108 184 58.7
subroutine 80 87 91.9
pod 39 39 100.0
total 1298 1802 72.0


line stmt bran cond sub pod time code
1             package File::Rsync::Mirror::Recentfile;
2              
3             # use warnings;
4 8     8   55737 use strict;
  8         23  
  8         581  
5              
6             =encoding utf-8
7              
8             =head1 NAME
9              
10             File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
11              
12             =cut
13              
14             my $HAVE = {};
15             for my $package (
16             "Data::Serializer",
17             "File::Rsync"
18             ) {
19             $HAVE->{$package} = eval qq{ require $package; };
20             }
21 8     8   99 use Config;
  8         15  
  8         355  
22 8     8   45 use File::Basename qw(basename dirname fileparse);
  8         50  
  8         755  
23 8     8   1469 use File::Copy qw(cp);
  8         9463  
  8         468  
24 8     8   42 use File::Path qw(mkpath);
  8         13  
  8         385  
25 8     8   4520 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
  8         19  
  8         1814  
26 8     8   42169 use File::Temp;
  8         181432  
  8         632  
27 8     8   49 use List::Util qw(first max min);
  8         16  
  8         728  
28 8     8   32 use Scalar::Util qw(reftype);
  8         10  
  8         291  
29 8     8   6746 use Storable;
  8         36460  
  8         483  
30 8     8   17521 use Time::HiRes qw();
  8         11959  
  8         207  
31 8     8   5951 use YAML::Syck;
  8         14926  
  8         480  
32              
33 8     8   43 use version; our $VERSION = qv('0.0.8');
  8         14  
  8         50  
34              
35 8     8   747 use constant MAX_INT => ~0>>1; # anything better?
  8         9  
  8         893  
36 8     8   39 use constant DEFAULT_PROTOCOL => 1;
  8         15  
  8         5854  
37              
38             # cf. interval_secs
39             my %seconds;
40              
41             # maybe subclass if this mapping is bad?
42             my %serializers;
43              
44             =head1 SYNOPSIS
45              
46             Writer (of a single file):
47              
48             use File::Rsync::Mirror::Recentfile;
49             my $fr = File::Rsync::Mirror::Recentfile->new
50             (
51             interval => q(6h),
52             filenameroot => "RECENT",
53             comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54             localroot => "/home/ftp/pub/PAUSE/authors/",
55             aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
56             );
57             $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
58              
59             Reader/mirrorer:
60              
61             my $rf = File::Rsync::Mirror::Recentfile->new
62             (
63             filenameroot => "RECENT",
64             interval => q(6h),
65             localroot => "/home/ftp/pub/PAUSE/authors",
66             remote_dir => "",
67             remote_host => "pause.perl.org",
68             remote_module => "authors",
69             rsync_options => {
70             compress => 1,
71             'rsync-path' => '/usr/bin/rsync',
72             links => 1,
73             times => 1,
74             'omit-dir-times' => 1,
75             checksum => 1,
76             },
77             verbose => 1,
78             );
79             $rf->mirror;
80              
81             Aggregator (usually the writer):
82              
83             my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
84             $rf->aggregate;
85              
86             =head1 DESCRIPTION
87              
88             Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89             is always composed of several recentfiles, controlled by the
90             F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91             for a single timeslice.
92              
93             =head1 EXPORT
94              
95             No exports.
96              
97             =head1 CONSTRUCTORS / DESTRUCTOR
98              
99             =head2 my $obj = CLASS->new(%hash)
100              
101             Constructor. On every argument pair the key is a method name and the
102             value is an argument to that method name.
103              
104             If a recentfile for this resource already exists, metadata that are
105             not defined by the constructor will be fetched from there as soon as
106             it is being read by recent_events().
107              
108             =cut
109              
110             sub new {
111 542     542 1 3309363 my($class, @args) = @_;
112 542         4101 my $self = bless {}, $class;
113 542         4456 while (@args) {
114 1700         10250 my($method,$arg) = splice @args, 0, 2;
115 1700         12960 $self->$method($arg);
116             }
117 542 50       7966 unless (defined $self->protocol) {
118 542         6167 $self->protocol(DEFAULT_PROTOCOL);
119             }
120 542 100       5941 unless (defined $self->filenameroot) {
121 517         6735 $self->filenameroot("RECENT");
122             }
123 542 100       6262 unless (defined $self->serializer_suffix) {
124 522         6222 $self->serializer_suffix(".yaml");
125             }
126 542         5151 return $self;
127             }
128              
129             =head2 my $obj = CLASS->new_from_file($file)
130              
131             Constructor. $file is a I.
132              
133             =cut
134              
135             sub new_from_file {
136 1109     1109 1 496720 my($class, $file) = @_;
137 1109         3618 my $self = bless {}, $class;
138 1109         5327 $self->_rfile($file);
139             #?# $self->lock;
140 1109 50       6618 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
  1109         48370  
141 1109         6027 local $/;
142 1109         47927 <$fh>;
143             };
144             # XXX: we can skip this step when the metadata are sufficient, but
145             # we cannot parse the file without some magic stuff about
146             # serialized formats
147 1109         17233 while (-l $file) {
148 31         974 my($name,$path) = fileparse $file;
149 31         281 my $symlink = readlink $file;
150 31 50       131 if ($symlink =~ m|/|) {
151 0         0 die "FIXME: filenames containing '/' not supported, got $symlink";
152             }
153 31         722 $file = File::Spec->catfile ( $path, $symlink );
154             }
155 1109         115143 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 1109         6352 $self->serializer_suffix($suffix);
157 1109         8683 $self->localroot($path);
158 1109 50       7776 die "Could not determine file format from suffix" unless $suffix;
159 1109         1993 my $deserialized;
160 1109 50       4103 if ($suffix eq ".yaml") {
    0          
161 1109         9137 require YAML::Syck;
162 1109         6006 $deserialized = YAML::Syck::LoadFile($file);
163             } elsif ($HAVE->{"Data::Serializer"}) {
164             my $serializer = Data::Serializer->new
165 0         0 ( serializer => $serializers{$suffix} );
166 0         0 $deserialized = $serializer->raw_deserialize($serialized);
167             } else {
168 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
169             }
170 1109         1056056 while (my($k,$v) = each %{$deserialized->{meta}}) {
  13119         88930  
171 12010 100       31547 next if $k ne lc $k; # "Producers"
172 10901         32768 $self->$k($v);
173             }
174 1109 50       3896 unless (defined $self->protocol) {
175 0         0 $self->protocol(DEFAULT_PROTOCOL);
176             }
177 1109         35426 return $self;
178             }
179              
180             =head2 DESTROY
181              
182             A simple unlock.
183              
184             =cut
185             sub DESTROY {
186 5198     5198   101688358 my $self = shift;
187 5198         27161 $self->unlock;
188 5198 100       60088 unless ($self->_current_tempfile_fh) {
189 5194 100       41551 if (my $tempfile = $self->_current_tempfile) {
190 119 100       13763 if (-e $tempfile) {
191             # unlink $tempfile; # may fail in global destruction
192             }
193             }
194             }
195             }
196              
197             =head1 ACCESSORS
198              
199             =cut
200              
201             my @accessors;
202              
203             BEGIN {
204 8     8   39 @accessors = (
205             "_current_tempfile",
206             "_current_tempfile_fh",
207             "_delayed_operations",
208             "_done",
209             "_interval",
210             "_is_locked",
211             "_localroot",
212             "_merged",
213             "_pathdb",
214             "_remember_last_uptodate_call",
215             "_remote_dir",
216             "_remoteroot",
217             "_requires_fsck",
218             "_rfile",
219             "_rsync",
220             "__verified_tempdir",
221             "_seeded",
222             "_uptodateness_ever_reached",
223             "_use_tempfile",
224             );
225              
226 8         333 my @pod_lines =
227 8         26 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
  1152         2114  
228              
229             =over 4
230              
231             =item aggregator
232              
233             A list of interval specs that tell the aggregator which Is
234             are to be produced.
235              
236             =item canonize
237              
238             The name of a method to canonize the path before rsyncing. Only
239             supported value is C. Defaults to that.
240              
241             =item comment
242              
243             A comment about this tree and setup.
244              
245             =item dirtymark
246              
247             A timestamp. The dirtymark is updated whenever an out of band change
248             on the origin server is performed that violates the protocol. Say,
249             they add or remove files in the middle somewhere. Slaves must react
250             with a devaluation of their C structure which then leads to a
251             full re-sync of all files. Implementation note: dirtymark may increase
252             or decrease.
253              
254             =item filenameroot
255              
256             The (prefix of the) filename we use for this I. Defaults to
257             C. The string must not contain a directory separator.
258              
259             =item have_mirrored
260              
261             Timestamp remembering when we mirrored this recentfile the last time.
262             Only relevant for slaves.
263              
264             =item ignore_link_stat_errors
265              
266             If set to true, rsync errors are ignored that complain about link stat
267             errors. These seem to happen only when there are files missing at the
268             origin. In race conditions this can always happen, so it defaults to
269             true.
270              
271             =item is_slave
272              
273             If set to true, this object will fetch a new recentfile from remote
274             when the timespan between the last mirror (see have_mirrored) and now
275             is too large (see C).
276              
277             =item keep_delete_objects_forever
278              
279             The default for delete events is that they are passed through the
280             collection of recentfile objects until they reach the Z file. There
281             they get dropped so that the associated file object ceases to exist at
282             all. By setting C the delete objects are
283             kept forever. This makes the Z file larger but has the advantage that
284             slaves that have interrupted mirroring for a long time still can clean
285             up their copy.
286              
287             =item locktimeout
288              
289             After how many seconds shall we die if we cannot lock a I?
290             Defaults to 600 seconds.
291              
292             =item loopinterval
293              
294             When mirror_loop is called, this accessor can specify how much time
295             every loop shall at least take. If the work of a loop is done before
296             that time has gone, sleeps for the rest of the time. Defaults to
297             arbitrary 42 seconds.
298              
299             =item max_files_per_connection
300              
301             Maximum number of files that are transferred on a single rsync call.
302             Setting it higher means higher performance at the price of holding
303             connections longer and potentially disturbing other users in the pool.
304             Defaults to the arbitrary value 42.
305              
306             =item max_rsync_errors
307              
308             When rsync operations encounter that many errors without any resetting
309             success in between, then we die. Defaults to unlimited. A value of
310             -1 means we run forever ignoring all rsync errors.
311              
312             =item minmax
313              
314             Hashref remembering when we read the recent_events from this file the
315             last time and what the timespan was.
316              
317             =item protocol
318              
319             When the RECENT file format changes, we increment the protocol. We try
320             to support older protocols in later releases.
321              
322             =item remote_host
323              
324             The host we are mirroring from. Leave empty for the local filesystem.
325              
326             =item remote_module
327              
328             Rsync servers have so called modules to separate directory trees from
329             each other. Put here the name of the module under which we are
330             mirroring. Leave empty for local filesystem.
331              
332             =item rsync_options
333              
334             Things like compress, links, times or checksums. Passed in to the
335             File::Rsync object used to run the mirror.
336              
337             =item serializer_suffix
338              
339             Mostly untested accessor. The only well tested format for
340             Is at the moment is YAML. It is used with YAML::Syck via
341             Data::Serializer. But in principle other formats are supported as
342             well. See section SERIALIZERS below.
343              
344             =item sleep_per_connection
345              
346             Sleep that many seconds (floating point OK) after every chunk of rsyncing
347             has finished. Defaults to arbitrary 0.42.
348              
349             =item tempdir
350              
351             Directory to write temporary files to. Must allow rename operations
352             into the tree which usually means it must live on the same partition
353             as the target directory. Defaults to C<< $self->localroot >>.
354              
355             =item ttl
356              
357             Time to live. Number of seconds after which this recentfile must be
358             fetched again from the origin server. Only relevant for slaves.
359             Defaults to arbitrary 24.2 seconds.
360              
361             =item verbose
362              
363             Boolean to turn on a bit verbosity.
364              
365             =item verboselog
366              
367             Path to the logfile to write verbose progress information to. This is
368             a primitive stop gap solution to get simple verbose logging working.
369             Switching to Log4perl or similar is probably the way to go.
370              
371             =back
372              
373             =cut
374              
375 8     8   5013 use accessors @accessors;
  8         6589  
  8         37  
376              
377             =head1 METHODS
378              
379             =head2 (void) $obj->aggregate( %options )
380              
381             Takes all intervals that are collected in the accessor called
382             aggregator. Sorts them by actual length of the interval.
383             Removes those that are shorter than our own interval. Then merges this
384             object into the next larger object. The merging continues upwards
385             as long as the next I is old enough to warrant a merge.
386              
387             If a merge is warranted is decided according to the interval of the
388             previous interval so that larger files are not so often updated as
389             smaller ones. If $options{force} is true, all files get updated.
390              
391             Here is an example to illustrate the behaviour. Given aggregators
392              
393             1h 1d 1W 1M 1Q 1Y Z
394              
395             then
396              
397             1h updates 1d on every call to aggregate()
398             1d updates 1W earliest after 1h
399             1W updates 1M earliest after 1d
400             1M updates 1Q earliest after 1W
401             1Q updates 1Y earliest after 1M
402             1Y updates Z earliest after 1Q
403              
404             Note that all but the smallest recentfile get updated at an arbitrary
405             rate and as such are quite useless on their own.
406              
407             =cut
408              
409             sub aggregate {
410 354     354 1 18092225 my($self, %option) = @_;
411 354         891 my %seen_interval;
412 2932         6199 my @aggs = sort { $a->{secs} <=> $b->{secs} }
413 1870 50       11475 grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
414 1870         7478 map { { interval => $_, secs => $self->interval_secs($_)} }
415 354 50       1843 $self->interval, @{$self->aggregator || []};
  354         1589  
416 354         1501 $self->update;
417 354         12393 $aggs[0]{object} = $self;
418 354         1720 AGGREGATOR: for my $i (0..$#aggs-1) {
419 986         2192 my $this = $aggs[$i]{object};
420 986         3378 my $next = $this->_sparse_clone;
421 986         4204 $next->interval($aggs[$i+1]{interval});
422 986         1576 my $want_merge = 0;
423 986 100 100     5126 if ($option{force} || $i == 0) {
424 606         889 $want_merge = 1;
425             } else {
426 380         1005 my $next_rfile = $next->rfile;
427 380 100       6420 if (-e $next_rfile) {
428 320         1095 my $prev = $aggs[$i-1]{object};
429 320         1840 local $^T = time;
430 320         3595 my $next_age = 86400 * -M $next_rfile;
431 320 100       965 if ($next_age > $prev->interval_secs) {
432 55         135 $want_merge = 1;
433             }
434             } else {
435 60         100 $want_merge = 1;
436             }
437             }
438 986 100       1938 if ($want_merge) {
439 721         2441 $next->merge($this);
440 721         19891 $aggs[$i+1]{object} = $next;
441             } else {
442 265         1530 last AGGREGATOR;
443             }
444             }
445             }
446              
447             # collect file size and mtime for all files of this aggregate
448             sub _debug_aggregate {
449 30     30   30815 my($self) = @_;
450 270         450 my @aggs = sort { $a->{secs} <=> $b->{secs} }
451 180         520 map { { interval => $_, secs => $self->interval_secs($_)} }
452 30 50       115 $self->interval, @{$self->aggregator || []};
  30         125  
453 30         80 my $report = [];
454 30         100 for my $i (0..$#aggs) {
455 180         11340 my $this = Storable::dclone $self;
456 180         470 $this->interval($aggs[$i]{interval});
457 180         335 my $rfile = $this->rfile;
458 180         2190 my @stat = stat $rfile;
459 180         1290 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460             }
461 30         400 $report;
462             }
463              
464             # (void) $self->_assert_symlink()
465             sub _assert_symlink {
466 1646     1646   3093 my($self) = @_;
467 1646         5747 my $recentrecentfile = File::Spec->catfile
468             (
469             $self->localroot,
470             sprintf
471             (
472             "%s.recent",
473             $self->filenameroot
474             )
475             );
476 1646 50       59088 if ($Config{d_symlink} eq "define") {
477 1646         2205 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
478 1646 100       25783 if (-l $recentrecentfile) {
479 1625         14638 my $found_symlink = readlink $recentrecentfile;
480 1625 100       5726 if ($found_symlink eq $self->rfilename) {
481 1610         3750 return;
482             } else {
483 15         20 $howto_create_symlink = 2;
484             }
485             } else {
486 21         38 $howto_create_symlink = 1;
487             }
488 36 100       104 if (1 == $howto_create_symlink) {
489 21 50       53 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
490             } else {
491 15         200 unlink "$recentrecentfile.$$"; # may fail
492 15 50       30 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
493 15 50       760 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494             }
495             } else {
496 0         0 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
497 0         0 unlink "$recentrecentfile.$$"; # may fail
498 0 0       0 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
499 0 0       0 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
500             }
501             }
502              
503             =head2 $hashref = $obj->delayed_operations
504              
505             A hash of hashes containing unlink and rmdir operations which had to
506             wait until the recentfile got unhidden in order to not confuse
507             downstream mirrors (in case we have some).
508              
509             =cut
510              
511             sub delayed_operations {
512 41     41 1 115 my($self) = @_;
513 41         276 my $x = $self->_delayed_operations;
514 41 100       504 unless (defined $x) {
515 15         250 $x = {
516             unlink => {},
517             rmdir => {},
518             };
519 15         115 $self->_delayed_operations ($x);
520             }
521 41         379 return $x;
522             }
523              
524             =head2 $done = $obj->done
525              
526             C<$done> is a reference to a L
527             object that keeps track of rsync activities. Only needed and used when
528             we are a mirroring slave.
529              
530             =cut
531              
532             sub done {
533 119     119 1 1850 my($self) = @_;
534 119         1583 my $done = $self->_done;
535 119 100       2592 if (!$done) {
536 15         416 require File::Rsync::Mirror::Recentfile::Done;
537 15         577 $done = File::Rsync::Mirror::Recentfile::Done->new();
538 15         177 $done->_rfinterval ($self->interval);
539 15         293 $self->_done ( $done );
540             }
541 119         2226 return $done;
542             }
543              
544             =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545              
546             Stores the remote I locally as a tempfile. The caller is
547             responsible to remove the file after use.
548              
549             Note: if you're intending to act as an rsync server for other slaves,
550             then you must prefer this method to fetch that file with
551             get_remotefile(). Otherwise downstream mirrors would expect you to
552             already have mirrored all the files that are in the I
553             before you have them mirrored.
554              
555             =cut
556              
557             sub get_remote_recentfile_as_tempfile {
558 74     74 1 302 my($self) = @_;
559 74         693 mkpath $self->localroot;
560 74         7663 my $fh;
561             my $trfilename;
562 74 100       763 if ( $self->_use_tempfile() ) {
563 43 100       841 if ($self->ttl_reached) {
564 10         122 $fh = $self->_current_tempfile_fh;
565 10         184 $trfilename = $self->rfilename;
566             } else {
567 33         291 return $self->_current_tempfile;
568             }
569             } else {
570 31         427 $trfilename = $self->rfilename;
571             }
572              
573 41         160 my $dst;
574 41 50       184 if ($fh) {
575 0         0 $dst = $self->_current_tempfile;
576             } else {
577 41         236 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
578 41         201 $dst = $fh->filename;
579 41         579 $self->_current_tempfile ($dst);
580 41         398 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
  41         200  
581 41 100 66     1364 if (defined $rfile && -e $rfile) {
582             # saving on bandwidth. Might need to be configurable
583             # $self->bandwidth_is_cheap?
584 27 50       459 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
585             }
586             }
587 41         17530 my $src = join ("/",
588             $self->remoteroot,
589             $trfilename,
590             );
591 41 50       327 if ($self->verbose) {
592 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
593 0         0 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
594 0         0 my $LFH = $self->_logfilehandle;
595 0         0 printf $LFH
596             (
597             "%-4s %d (1/1/%s) temp %s ... ",
598             $doing,
599             time,
600             $self->interval,
601             $display_dst,
602             );
603             }
604 41         569 my $gaveup = 0;
605 41         250 my $retried = 0;
606 41         971 local($ENV{LANG}) = "C";
607 41         317 while (!$self->rsync->exec(
608             src => $src,
609             dst => $dst,
610             )) {
611 0         0 $self->register_rsync_error ($self->rsync->err);
612 0 0       0 if (++$retried >= 3) {
613 0         0 warn "XXX giving up";
614 0         0 $gaveup = 1;
615 0         0 last;
616             }
617             }
618 41 50       2875561 if ($gaveup) {
619 0         0 my $LFH = $self->_logfilehandle;
620 0         0 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
621             } else {
622 41         1735 $self->_refresh_internals ($dst);
623 41         1205 $self->have_mirrored (Time::HiRes::time);
624 41         790 $self->un_register_rsync_error ();
625             }
626 41         364 $self->unseed;
627 41 50       311 if ($self->verbose) {
628 0         0 my $LFH = $self->_logfilehandle;
629 0         0 print $LFH "DONE\n";
630             }
631 41         626 my $mode = 0644;
632 41 50       3103 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
633 41         1621 return $dst;
634             }
635              
636             sub _verified_tempdir {
637 41     41   107 my($self) = @_;
638 41         307 my $tempdir = $self->__verified_tempdir();
639 41 100       544 return $tempdir if defined $tempdir;
640 20 50       107 unless ($tempdir = $self->tempdir) {
641 20         178 $tempdir = $self->localroot;
642             }
643 20 50       669 unless (-d $tempdir) {
644 0         0 mkpath $tempdir;
645             }
646 20         103 $self->__verified_tempdir($tempdir);
647 20         226 return $tempdir;
648             }
649              
650             sub _get_remote_rat_provide_tempfile_object {
651 41     41   168 my($self, $trfilename) = @_;
652 41         246 my $_verified_tempdir = $self->_verified_tempdir;
653 41         490 my $fh = File::Temp->new
654             (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
655             $trfilename,
656             ),
657             DIR => $_verified_tempdir,
658             SUFFIX => $self->serializer_suffix,
659             UNLINK => $self->_use_tempfile,
660             );
661 41         35953 my $mode = 0644;
662 41         328 my $dst = $fh->filename;
663 41 50       1462 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
664 41 100       297 if ($self->_use_tempfile) {
665 10         154 $self->_current_tempfile_fh ($fh); # delay self destruction
666             }
667 41         421 return $fh;
668             }
669              
670             sub _logfilehandle {
671 0     0   0 my($self) = @_;
672 0         0 my $fh;
673 0 0       0 if (my $vl = $self->verboselog) {
674 0 0       0 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
675             } else {
676 0         0 $fh = \*STDERR;
677             }
678 0         0 return $fh;
679             }
680              
681             =head2 $localpath = $obj->get_remotefile ( $relative_path )
682              
683             Rsyncs one single remote file to local filesystem.
684              
685             Note: no locking is done on this file. Any number of processes may
686             mirror this object.
687              
688             Note II: do not use for recentfiles. If you are a cascading
689             slave/server combination, it would confuse other slaves. They would
690             expect the contents of these recentfiles to be available. Use
691             get_remote_recentfile_as_tempfile() instead.
692              
693             =cut
694              
695             sub get_remotefile {
696 0     0 1 0 my($self, $path) = @_;
697 0         0 my $dst = File::Spec->catfile($self->localroot, $path);
698 0         0 mkpath dirname $dst;
699 0 0       0 if ($self->verbose) {
700 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
701 0         0 my $LFH = $self->_logfilehandle;
702 0         0 printf $LFH
703             (
704             "%-4s %d (1/1/%s) %s ... ",
705             $doing,
706             time,
707             $self->interval,
708             $path,
709             );
710             }
711 0         0 local($ENV{LANG}) = "C";
712 0 0       0 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
713 0         0 while (!$self->rsync->exec(
714             src => join("/",
715             $remoteroot,
716             $path),
717             dst => $dst,
718             )) {
719 0         0 $self->register_rsync_error ($self->rsync->err);
720             }
721 0         0 $self->un_register_rsync_error ();
722 0 0       0 if ($self->verbose) {
723 0         0 my $LFH = $self->_logfilehandle;
724 0         0 print $LFH "DONE\n";
725             }
726 0         0 return $dst;
727             }
728              
729             =head2 $obj->interval ( $interval_spec )
730              
731             Get/set accessor. $interval_spec is a string and described below in
732             the section INTERVAL SPEC.
733              
734             =cut
735              
736             sub interval {
737 71177     71177 1 140743 my ($self, $interval) = @_;
738 71177 100       150426 if (@_ >= 2) {
739 5060         17389 $self->_interval($interval);
740 5060         34509 $self->_rfile(undef);
741             }
742 71177         204050 $interval = $self->_interval;
743 71177 100       383874 unless (defined $interval) {
744             # do not ask the $self too much, it recurses!
745 1         7 require Carp;
746 1         211 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
747             }
748 71176         252395 return $interval;
749             }
750              
751             =head2 $secs = $obj->interval_secs ( $interval_spec )
752              
753             $interval_spec is described below in the section INTERVAL SPEC. If
754             empty defaults to the inherent interval for this object.
755              
756             =cut
757              
758             sub interval_secs {
759 25955     25955 1 204414 my ($self, $interval) = @_;
760 25955   66     72192 $interval ||= $self->interval;
761 25954 50       56390 unless (defined $interval) {
762 0         0 die "interval_secs() called without argument on an object without a declared one";
763             }
764 25954 100       140122 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
765             die "Could not determine seconds from interval[$interval]";
766 25953 100 33     183311 if ($interval eq "Z") {
    50          
767 960         3935 return MAX_INT;
768             } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
769 24993         112759 return $seconds{$t}*$n;
770             } else {
771 0         0 die "Invalid interval specification: n[$n]t[$t]";
772             }
773             }
774              
775             =head2 $obj->localroot ( $localroot )
776              
777             Get/set accessor. The local root of the tree. Guaranteed without
778             trailing slash.
779              
780             =cut
781              
782             sub localroot {
783 11503     11503 1 20658 my ($self, $localroot) = @_;
784 11503 100       33537 if (@_ >= 2) {
785 1663         10841 $localroot =~ s|/$||;
786 1663         8945 $self->_localroot($localroot);
787 1663         14731 $self->_rfile(undef);
788             }
789 11503         39319 $localroot = $self->_localroot;
790             }
791              
792             =head2 $ret = $obj->local_path($path_found_in_recentfile)
793              
794             Combines the path to our local mirror and the path of an object found
795             in this I. In other words: the target of a mirror operation.
796              
797             Implementation note: We split on slashes and then use
798             File::Spec::catfile to adjust to the local operating system.
799              
800             =cut
801              
802             sub local_path {
803 1446     1446 1 2713 my($self,$path) = @_;
804 1446 50       4173 unless (defined $path) {
805             # seems like a degenerated case
806 0         0 return $self->localroot;
807             }
808 1446         4416 my @p = split m|/|, $path;
809 1446         4548 File::Spec->catfile($self->localroot,@p);
810             }
811              
812             =head2 (void) $obj->lock
813              
814             Locking is implemented with an C on a locking directory
815             (C<.lock> appended to $rfile).
816              
817             =cut
818              
819             sub lock {
820 3088     3088 1 5332 my ($self) = @_;
821             # not using flock because it locks on filehandles instead of
822             # old school ressources.
823 3088 50       10377 my $locked = $self->_is_locked and return;
824 3088         22630 my $rfile = $self->rfile;
825             # XXX need a way to allow breaking the lock
826 3088         6989 my $start = time;
827 3088   50     9418 my $locktimeout = $self->locktimeout || 600;
828 3088         21130 my %have_warned;
829 3088         9694 my $lockdir = "$rfile.lock";
830 3088         6298 my $procfile = "$lockdir/process";
831 3088         803780 GETLOCK: while (not mkdir $lockdir) {
832 0 0       0 if (open my $fh, "<", $procfile) {
833 0         0 chomp(my $process = <$fh>);
834 0 0       0 if (0) {
    0          
    0          
835 0         0 } elsif ($process !~ /^\d+$/) {
836 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
837             } elsif ($$ == $process) {
838 0         0 last GETLOCK;
839             } elsif (kill 0, $process) {
840 0 0       0 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
841             } else {
842 0         0 warn "Warning: breaking lock held by process $process";
843 0         0 sleep 1;
844 0         0 last GETLOCK;
845             }
846             } else {
847 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
848             }
849 0         0 Time::HiRes::sleep 0.01;
850 0 0       0 if (time - $start > $locktimeout) {
851 0         0 die "Could not acquire lockdirectory '$rfile.lock': $!";
852             }
853             } # GETLOCK
854 3088 50       219127 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
855 3088         38270 print $fh $$, "\n";
856 3088 50       121599 close $fh or die "Could not close: $!";
857 3088         13579 $self->_is_locked (1);
858             }
859              
860             =head2 (void) $obj->merge ($other)
861              
862             Bulk update of this object with another one. It's used to merge a
863             smaller and younger $other object into the current one. If this file
864             is a C file, then we normally do not merge in objects of type
865             C; this can be overridden by setting
866             keep_delete_objects_forever. But if we encounter an object of type
867             delete we delete the corresponding C object if we have it.
868              
869             If there is nothing to be merged, nothing is done.
870              
871             =cut
872              
873             sub merge {
874 721     721 1 1345 my($self, $other) = @_;
875 721         2058 $self->_merge_sanitycheck ( $other );
876 721         2596 $other->lock;
877 721   50     6316 my $other_recent = $other->recent_events || [];
878 721         1925 $self->lock;
879 721         6968 $self->_merge_locked ( $other, $other_recent );
880 721         12359 $self->unlock;
881 721         4173 $other->unlock;
882             }
883              
884             sub _merge_locked {
885 721     721   1304 my($self, $other, $other_recent) = @_;
886 721   50     2160 my $my_recent = $self->recent_events || [];
887              
888             # calculate the target time span
889 721 100       2643 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
890 721 50       1866 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
891 721         1125 my $oldest_allowed = 0;
892 721         918 my $something_done;
893 721 100       1687 unless ($my_recent->[0]) {
894             # obstetrics
895 75         100 $something_done = 1;
896             }
897 721 50       2019 if ($epoch) {
898 721 100 50     2076 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
    100 100        
899 233         3680 $oldest_allowed = 0;
900 233         322 $something_done = 1;
901             } elsif (my $merged = $self->merged) {
902 446         1174 my $secs = $self->interval_secs();
903 446   50     4007 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
904 446 50 33     2993 if (@$other_recent and
905             _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
906             ) {
907 0         0 $oldest_allowed = $other_recent->[-1]{epoch};
908             }
909             }
910 721   100     3928 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
911 1386         2268 pop @$my_recent;
912 1386         7033 $something_done = 1;
913             }
914             }
915              
916 721         1542 my %have_path;
917 721         1304 my $other_recent_filtered = [];
918 721         2137 for my $oev (@$other_recent) {
919 23431   50     60489 my $oevepoch = $oev->{epoch} || 0;
920 23431 50       51529 next if _bigfloatlt($oevepoch, $oldest_allowed);
921 23431         43895 my $path = $oev->{path};
922 23431 50       70773 next if $have_path{$path}++;
923 23431 100 100     102666 if ( $self->interval eq "Z"
      66        
924             and $oev->{type} eq "delete"
925             and ! $self->keep_delete_objects_forever
926             ) {
927             # do nothing
928             } else {
929 23404 100 100     70115 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
930 4710         8113 $something_done = 1;
931             }
932 23404         118121 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
933             }
934             }
935 721 100       3342 if ($something_done) {
936 679         2415 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
937             }
938             }
939              
940             sub _merge_something_done {
941 679     679   1273 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
942 679         1024 my $recent = [];
943 679         900 my $epoch_conflict = 0;
944 679         953 my $last_epoch;
945 679   100     1827 ZIP: while (@$other_recent_filtered || @$my_recent) {
946 52851         51663 my $event;
947 52851 100 100     215726 if (!@$my_recent ||
      66        
948             @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
949 22350         30021 $event = shift @$other_recent_filtered;
950             } else {
951 30501         40432 $event = shift @$my_recent;
952 30501 100       141449 next ZIP if $have_path->{$event->{path}}++;
953             }
954 35274 100 100     144514 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
955 35274         49621 $last_epoch = $event->{epoch};
956 35274         113579 push @$recent, $event;
957             }
958 679 100       1447 if ($epoch_conflict) {
959 10         10 my %have_epoch;
960 10         45 for (my $i = $#$recent;$i>=0;$i--) {
961 270         335 my $epoch = $recent->[$i]{epoch};
962 270 100       1180 if ($have_epoch{$epoch}++) {
963 10         30 while ($have_epoch{$epoch}) {
964 10         40 $epoch = _increase_a_bit($epoch);
965             }
966 10         20 $recent->[$i]{epoch} = $epoch;
967 10         40 $have_epoch{$epoch}++;
968             }
969             }
970             }
971 679 100 100     2066 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
972 233         3214 $self->dirtymark ( $other->dirtymark );
973             }
974 679         8686 $self->write_recent($recent);
975             $other->merged({
976             time => Time::HiRes::time, # not used anywhere
977             epoch => $recent->[0]{epoch},
978 679         3933 into_interval => $self->interval, # not used anywhere
979             });
980 679         1640 $other->write_recent($other_recent);
981             }
982              
983             sub _merge_sanitycheck {
984 721     721   1308 my($self, $other) = @_;
985 721 50       1725 if ($self->interval_secs <= $other->interval_secs) {
986 0         0 require Carp;
987 0         0 Carp::confess
988             (sprintf
989             (
990             "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
991             $self->interval_secs,
992             $other->interval_secs,
993             ));
994             }
995             }
996              
997             =head2 merged
998              
999             Hashref denoting when this recentfile has been merged into some other
1000             at which epoch.
1001              
1002             =cut
1003              
1004             sub merged {
1005 14508     14508 1 28782 my($self, $set) = @_;
1006 14508 100       40943 if (defined $set) {
1007 4542         13011 $self->_merged ($set);
1008             }
1009 14508         63884 my $merged = $self->_merged;
1010 14508         62175 my $into;
1011 14508 100 100     87564 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
      100        
1012             # sanity checks
1013 9355 50       72814 if ($into eq $self->interval) {
    50          
1014 0         0 require Carp;
1015 0         0 Carp::cluck(sprintf
1016             (
1017             "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1018             $into,
1019             $self->interval,
1020             ));
1021             } elsif ($self->interval_secs($into) < $self->interval_secs) {
1022 0         0 require Carp;
1023 0         0 Carp::cluck(sprintf
1024             (
1025             "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1026             $self->interval_secs($into),
1027             $self->interval_secs,
1028             $self->interval,
1029             ));
1030             }
1031             }
1032 14508         45437 $merged;
1033             }
1034              
1035             =head2 $hashref = $obj->meta_data
1036              
1037             Returns the hashref of metadata that the server has to add to the
1038             I.
1039              
1040             =cut
1041              
1042             sub meta_data {
1043 2717     2717 1 4072 my($self) = @_;
1044 2717         4224 my $ret = $self->{meta};
1045 2717         6026 for my $m (
1046             "aggregator",
1047             "canonize",
1048             "comment",
1049             "dirtymark",
1050             "filenameroot",
1051             "interval",
1052             "merged",
1053             "minmax",
1054             "protocol",
1055             "serializer_suffix",
1056             ) {
1057 27170         70870 my $v = $self->$m;
1058 27170 100       114199 if (defined $v) {
1059 23372         63114 $ret->{$m} = $v;
1060             }
1061             }
1062             # XXX need to reset the Producer if I am a writer, keep it when I
1063             # am a reader
1064             $ret->{Producers} ||= {
1065 2717   50     41672 __PACKAGE__, "$VERSION", # stringified it looks better
1066             '$0', $0,
1067             'time', Time::HiRes::time,
1068             };
1069 2717   66     8282 $ret->{dirtymark} ||= Time::HiRes::time;
1070 2717         9582 return $ret;
1071             }
1072              
1073             =head2 $success = $obj->mirror ( %options )
1074              
1075             Mirrors the files in this I as reported by
1076             C. Options named C, C, C are passed
1077             through to the C call. The boolean option C,
1078             if true, causes C to only rsync C
1079             and keep track of the rsynced files so that future calls will rsync
1080             different files until all files are brought to sync.
1081              
1082             =cut
1083              
1084             sub mirror {
1085 32     32 1 5307 my($self, %options) = @_;
1086 32         335 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1087 32         5279 $self->_use_tempfile (1);
1088             # skip-deletes is inadequat for passthrough within mirror. We
1089             # would never reach uptodateness when a delete were on a
1090             # borderline
1091 32         299 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
  96         1036  
1092 32         494 my ($recent_events) = $self->recent_events(%passthrough);
1093 32         109 my(@error, @dlcollector); # download-collector: array containing paths we need
1094 32         110 my $first_item = 0;
1095 32         101 my $last_item = $#$recent_events;
1096 32         446 my $done = $self->done;
1097 32         248 my $pathdb = $self->_pathdb;
1098 32         269 ITEM: for my $i ($first_item..$last_item) {
1099 2667         3790 my $status = +{};
1100 2667         7724 $self->_mirror_item
1101             (
1102             $i,
1103             $recent_events,
1104             $last_item,
1105             $done,
1106             $pathdb,
1107             \@dlcollector,
1108             \%options,
1109             $status,
1110             \@error,
1111             );
1112 2667 100       8236 last if $i == $last_item;
1113 2641 100       8333 if ($status->{mustreturn}){
1114 6 100 66     202 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1115             # looks like a bug somewhere else
1116 5         298 my $t = $self->_current_tempfile;
1117 5 50       1956 unlink $t or die "Could not unlink '$t': $!";
1118 5         49 $self->_current_tempfile(undef);
1119 5         128 $self->_use_tempfile(0);
1120             }
1121 6         4946 return;
1122             }
1123             }
1124 26 100       134 if (@dlcollector) {
1125 17         73 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
  17         177  
1126 17 50 33     326 if (!$success || $@) {
1127 0         0 warn "Warning: Unknown error while mirroring: $@";
1128 0         0 push @error, $@;
1129 0         0 sleep 1;
1130             }
1131             }
1132 26 50       275 if ($self->verbose) {
1133 0         0 my $LFH = $self->_logfilehandle;
1134 0         0 print $LFH "DONE\n";
1135             }
1136             # once we've gone to the end we consider ourselves free of obligations
1137 26         409 $self->unseed;
1138 26         227 $self->_mirror_unhide_tempfile ($trecentfile);
1139 26         344 $self->_mirror_perform_delayed_ops(\%options);
1140 26         6157 return !@error;
1141             }
1142              
1143             sub _mirror_item {
1144 2667     2667   5714 my($self,
1145             $i,
1146             $recent_events,
1147             $last_item,
1148             $done,
1149             $pathdb,
1150             $dlcollector,
1151             $options,
1152             $status,
1153             $error,
1154             ) = @_;
1155 2667         3982 my $recent_event = $recent_events->[$i];
1156 2667 100       8212 return if $done->covered ( $recent_event->{epoch} );
1157 1446 100       3117 if ($pathdb) {
1158 811         1923 my $rec = $pathdb->{$recent_event->{path}};
1159 811 50 66     2886 if ($rec && $rec->{recentepoch}) {
1160 256 50       787 if (_bigfloatgt
1161             ( $rec->{recentepoch}, $recent_event->{epoch} )){
1162 0         0 $done->register ($recent_events, [$i]);
1163 0         0 return;
1164             }
1165             }
1166             }
1167 1446         4696 my $dst = $self->local_path($recent_event->{path});
1168 1446 100       30486 if ($recent_event->{type} eq "new"){
    50          
1169 1422         3784 $self->_mirror_item_new
1170             (
1171             $dst,
1172             $i,
1173             $last_item,
1174             $recent_events,
1175             $recent_event,
1176             $dlcollector,
1177             $pathdb,
1178             $status,
1179             $error,
1180             $options,
1181             );
1182             } elsif ($recent_event->{type} eq "delete") {
1183 24         62 my $activity;
1184 24 50       117 if ($options->{'skip-deletes'}) {
1185 0         0 $activity = "skipped";
1186             } else {
1187 24         942 my @lstat = lstat $dst;
1188 24 100 33     511 if (! -e _) {
    50          
1189 9         91 $activity = "not_found";
1190             } elsif (-l _ or not -d _) {
1191 15         120 $self->delayed_operations->{unlink}{$dst}++;
1192 15         95 $activity = "deleted";
1193             } else {
1194 0         0 $self->delayed_operations->{rmdir}{$dst}++;
1195 0         0 $activity = "deleted";
1196             }
1197             }
1198 24         159 $done->register ($recent_events, [$i]);
1199 24 100       158 if ($pathdb) {
1200 9         67 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1201             }
1202             } else {
1203 0         0 warn "Warning: invalid upload type '$recent_event->{type}'";
1204             }
1205             }
1206              
1207             sub _mirror_item_new {
1208 1422     1422   3543 my($self,
1209             $dst,
1210             $i,
1211             $last_item,
1212             $recent_events,
1213             $recent_event,
1214             $dlcollector,
1215             $pathdb,
1216             $status,
1217             $error,
1218             $options,
1219             ) = @_;
1220 1422 50       4121 if ($self->verbose) {
1221 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
1222 0         0 my $LFH = $self->_logfilehandle;
1223             printf $LFH
1224             (
1225             "%-4s %d (%d/%d/%s) %s ... ",
1226             $doing,
1227             time,
1228             1+$i,
1229             1+$last_item,
1230             $self->interval,
1231             $recent_event->{path},
1232 0         0 );
1233             }
1234 1422   50     8011 my $max_files_per_connection = $self->max_files_per_connection || 42;
1235 1422         8866 my $success;
1236 1422 50       4734 if ($self->verbose) {
1237 0         0 my $LFH = $self->_logfilehandle;
1238 0         0 print $LFH "\n";
1239             }
1240 1422         10159 push @$dlcollector, { rev => $recent_event, i => $i };
1241 1422 100       3247 if (@$dlcollector >= $max_files_per_connection) {
1242 11         27 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
  11         79  
1243 11         103 my $sleep = $self->sleep_per_connection;
1244 11 50       113 $sleep = 0.42 unless defined $sleep;
1245 11         4622018 Time::HiRes::sleep $sleep;
1246 11 100       455 if ($options->{piecemeal}) {
1247 6         209 $status->{mustreturn} = 1;
1248 6         148 return;
1249             }
1250             } else {
1251 1411         3961 return;
1252             }
1253 5 50 33     250 if (!$success || $@) {
1254 0         0 warn "Warning: Error while mirroring: $@";
1255 0         0 push @$error, $@;
1256 0         0 sleep 1;
1257             }
1258 5 50       170 if ($self->verbose) {
1259 0         0 my $LFH = $self->_logfilehandle;
1260 0         0 print $LFH "DONE\n";
1261             }
1262             }
1263              
1264             sub _mirror_dlcollector {
1265 28     28   84 my($self,$xcoll,$pathdb,$recent_events) = @_;
1266 28         98 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
  1422         3109  
1267 28 100       6479 if ($pathdb) {
1268 18         230 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
  802         3813  
1269             }
1270 28         749 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
  1422         4891  
1271 28         2460 @$xcoll = ();
1272 28         158 return $success;
1273             }
1274              
1275             sub _mirror_register_path {
1276 27     27   174 my($self,$pathdb,$coll,$activity) = @_;
1277 27         223 my $time = time;
1278 27         181 for my $item (@$coll) {
1279             $pathdb->{$item->{path}} =
1280             {
1281             recentepoch => $item->{epoch},
1282 811         12062 ($activity."_on") => $time,
1283             };
1284             }
1285             }
1286              
1287             sub _mirror_unhide_tempfile {
1288 26     26   100 my($self, $trecentfile) = @_;
1289 26         175 my $rfile = $self->rfile;
1290 26 50       4229 if (rename $trecentfile, $rfile) {
1291             # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1292             } else {
1293 0         0 require Carp;
1294 0         0 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1295             }
1296 26         191 $self->_use_tempfile (0);
1297 26 100       349 if (my $ctfh = $self->_current_tempfile_fh) {
1298 10         320 $ctfh->unlink_on_destroy (0);
1299 10         255 $self->_current_tempfile_fh (undef);
1300             }
1301             }
1302              
1303             sub _mirror_perform_delayed_ops {
1304 26     26   1527 my($self,$options) = @_;
1305 26         201 my $delayed = $self->delayed_operations;
1306 26         97 for my $dst (keys %{$delayed->{unlink}}) {
  26         284  
1307 30 100       1805 unless (unlink $dst) {
1308 15         100 require Carp;
1309 15 50       70 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
1310             }
1311 30 50       17505 if ($self->verbose) {
1312 0         0 my $doing = "Del";
1313 0         0 my $LFH = $self->_logfilehandle;
1314 0         0 printf $LFH
1315             (
1316             "%-4s %d (%s) %s DONE\n",
1317             $doing,
1318             time,
1319             $self->interval,
1320             $dst,
1321             );
1322 0         0 delete $delayed->{unlink}{$dst};
1323             }
1324             }
1325 26         116 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
  0         0  
  26         370  
1326 0 0       0 unless (rmdir $dst) {
1327 0         0 require Carp;
1328 0 0       0 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
1329             }
1330 0 0       0 if ($self->verbose) {
1331 0         0 my $doing = "Del";
1332 0         0 my $LFH = $self->_logfilehandle;
1333 0         0 printf $LFH
1334             (
1335             "%-4s %d (%s) %s DONE\n",
1336             $doing,
1337             time,
1338             $self->interval,
1339             $dst,
1340             );
1341 0         0 delete $delayed->{rmdir}{$dst};
1342             }
1343             }
1344             }
1345              
1346             =head2 $success = $obj->mirror_path ( $arrref | $path )
1347              
1348             If the argument is a scalar it is treated as a path. The remote path
1349             is mirrored into the local copy. $path is the path found in the
1350             I, i.e. it is relative to the root directory of the
1351             mirror.
1352              
1353             If the argument is an array reference then all elements are treated as
1354             a path below the current tree and all are rsynced with a single
1355             command (and a single connection).
1356              
1357             =cut
1358              
1359             sub mirror_path {
1360 28     28 1 72 my($self,$path) = @_;
1361             # XXX simplify the two branches such that $path is treated as
1362             # [$path] maybe even demand the argument as an arrayref to
1363             # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1364             # interface)
1365 28 50 33     435 if (ref $path and ref $path eq "ARRAY") {
1366 28         82 my $dst = $self->localroot;
1367 28         4743 mkpath dirname $dst;
1368 28         165 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1369             lc $self->filenameroot,
1370             ),
1371             TMPDIR => 1,
1372             UNLINK => 0,
1373             );
1374 28         42721 for my $p (@$path) {
1375 1422         3958 print $fh $p, "\n";
1376             }
1377 28         2084 $fh->flush;
1378 28         144 $fh->unlink_on_destroy(1);
1379 28         290 my $gaveup = 0;
1380 28         55 my $retried = 0;
1381 28         439 local($ENV{LANG}) = "C";
1382 28         181 while (!$self->rsync->exec
1383             (
1384             src => join("/",
1385             $self->remoteroot,
1386             ),
1387             dst => $dst,
1388             'files-from' => $fh->filename,
1389             )) {
1390 0         0 my(@err) = $self->rsync->err;
1391 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1392 0 0       0 if ($self->verbose) {
1393 0         0 my $LFH = $self->_logfilehandle;
1394 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1395             }
1396 0         0 return 1;
1397             }
1398 0         0 $self->register_rsync_error (@err);
1399 0 0       0 if (++$retried >= 3) {
1400 0         0 my $batchsize = @$path;
1401 0         0 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1402 0         0 $gaveup = 1;
1403 0         0 last;
1404             }
1405 0         0 sleep 1;
1406             }
1407 28 50       3340875 unless ($gaveup) {
1408 28         976 $self->un_register_rsync_error ();
1409             }
1410             } else {
1411 0         0 my $dst = $self->local_path($path);
1412 0         0 mkpath dirname $dst;
1413 0         0 local($ENV{LANG}) = "C";
1414 0         0 while (!$self->rsync->exec
1415             (
1416             src => join("/",
1417             $self->remoteroot,
1418             $path
1419             ),
1420             dst => $dst,
1421             )) {
1422 0         0 my(@err) = $self->rsync->err;
1423 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1424 0 0       0 if ($self->verbose) {
1425 0         0 my $LFH = $self->_logfilehandle;
1426 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1427             }
1428 0         0 return 1;
1429             }
1430 0         0 $self->register_rsync_error (@err);
1431             }
1432 0         0 $self->un_register_rsync_error ();
1433             }
1434 28         25326 return 1;
1435             }
1436              
1437             sub _my_ignore_link_stat_errors {
1438 0     0   0 my($self) = @_;
1439 0         0 my $x = $self->ignore_link_stat_errors;
1440 0 0       0 $x = 1 unless defined $x;
1441 0         0 return $x;
1442             }
1443              
1444             sub _my_current_rfile {
1445 6619     6619   13307 my($self) = @_;
1446 6619         10356 my $rfile;
1447 6619 100       17644 if ($self->_use_tempfile) {
1448 33         324 $rfile = $self->_current_tempfile;
1449             }
1450 6619 100 66     44389 unless ($rfile && -s $rfile) {
1451 6586         14142 $rfile = $self->rfile;
1452             }
1453 6619         20884 return $rfile;
1454             }
1455              
1456             =head2 $path = $obj->naive_path_normalize ($path)
1457              
1458             Takes an absolute unix style path as argument and canonicalizes it to
1459             a shorter path if possible, removing things like double slashes or
1460             C and removes references to C<../> directories to get a shorter
1461             unambiguos path. This is used to make the code easier that determines
1462             if a file passed to C is indeed below our C.
1463              
1464             =cut
1465              
1466             sub naive_path_normalize {
1467 1292     1292 1 2161 my($self,$path) = @_;
1468 1292         11885 $path =~ s|/+|/|g;
1469 1292         5211 1 while $path =~ s|/[^/]+/\.\./|/|;
1470 1292         2925 $path =~ s|/$||;
1471 1292         3585 $path;
1472             }
1473              
1474             =head2 $ret = $obj->read_recent_1 ( $data )
1475              
1476             Delegate of C on protocol 1
1477              
1478             =cut
1479              
1480             sub read_recent_1 {
1481 6487     6487 1 9522 my($self, $data) = @_;
1482 6487         16362 return $data->{recent};
1483             }
1484              
1485             =head2 $array_ref = $obj->recent_events ( %options )
1486              
1487             Note: the code relies on the resource being written atomically. We
1488             cannot lock because we may have no write access. If the caller has
1489             write access (eg. aggregate() or update()), it has to care for any
1490             necessary locking and it MUST write atomically.
1491              
1492             If C<$options{after}> is specified, only file events after this
1493             timestamp are returned.
1494              
1495             If C<$options{before}> is specified, only file events before this
1496             timestamp are returned.
1497              
1498             If C<$options{max}> is specified only a maximum of this many most
1499             recent events is returned.
1500              
1501             If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1502             will be returned.
1503              
1504             If C<$options{contains}> is specified the value must be a hash
1505             reference containing a query. The query may contain the keys C,
1506             C, and C. Each represents a condition that must be met. If
1507             there is more than one such key, the conditions are ANDed.
1508              
1509             If C<$options{info}> is specified, it must be a hashref. This hashref
1510             will be filled with metadata about the unfiltered recent_events of
1511             this object, in key C there is the first item, in key C
1512             is the last.
1513              
1514             =cut
1515              
1516             sub recent_events {
1517 6597     6597 1 164357 my ($self, %options) = @_;
1518 6597         12787 my $info = $options{info};
1519 6597 100       22570 if ($self->is_slave) {
1520             # XXX seems dubious, might produce tempfiles without removing them?
1521 37         432 $self->get_remote_recentfile_as_tempfile;
1522             }
1523 6597 50       49698 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1524 6597 100       104143 -e $rfile_or_tempfile or return [];
1525 6487         18919 my $suffix = $self->serializer_suffix;
1526 6487         40677 my ($data) = eval {
1527 6487         20586 $self->_try_deserialize
1528             (
1529             $suffix,
1530             $rfile_or_tempfile,
1531             );
1532             };
1533 6487         6727382 my $err = $@;
1534 6487 50 33     40325 if ($err or !$data) {
1535 0         0 return [];
1536             }
1537 6487         8609 my $re;
1538 6487 50       28634 if (reftype $data eq 'ARRAY') { # protocol 0
1539 0         0 $re = $data;
1540             } else {
1541 6487         23415 $re = $self->_recent_events_protocol_x
1542             (
1543             $data,
1544             $rfile_or_tempfile,
1545             );
1546             }
1547 6487 100       13922 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
  32435         108087  
1548 2005         7150 $self->_recent_events_handle_options ($re, \%options);
1549             }
1550              
1551             # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1552             sub _recent_events_handle_options {
1553 2005     2005   3140 my($self, $re, $options) = @_;
1554 2005         2925 my $last_item = $#$re;
1555 2005         3770 my $info = $options->{info};
1556 2005 100       5380 if ($info) {
1557 2000         5080 $info->{first} = $re->[0];
1558 2000         4830 $info->{last} = $re->[-1];
1559             }
1560 2005 100       4620 if (defined $options->{after}) {
1561 5 50       100 if ($re->[0]{epoch} > $options->{after}) {
1562 5 50       155 if (
1563             my $f = first
1564 125     125   270 {$re->[$_]{epoch} <= $options->{after}}
1565             0..$#$re
1566             ) {
1567 5         20 $last_item = $f-1;
1568             }
1569             } else {
1570 0         0 $last_item = -1;
1571             }
1572             }
1573 2005         2775 my $first_item = 0;
1574 2005 100       4800 if (defined $options->{before}) {
1575 2000 100       9095 if ($re->[0]{epoch} > $options->{before}) {
1576 1850 100       19445 if (
1577             my $f = first
1578 141305     141305   298810 {$re->[$_]{epoch} < $options->{before}}
1579             0..$last_item
1580             ) {
1581 590         1480 $first_item = $f;
1582             }
1583             } else {
1584 150         1105 $first_item = 0;
1585             }
1586             }
1587 2005 50 66     18925 if (0 != $first_item || -1 != $last_item) {
1588 2005         27795 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1589             }
1590 2005 50       10700 if ($options->{'skip-deletes'}) {
1591 0         0 @$re = grep { $_->{type} ne "delete" } @$re;
  0         0  
1592             }
1593 2005 50       4840 if (my $contopt = $options->{contains}) {
1594 0         0 my $seen_allowed = 0;
1595 0         0 for my $allow (qw(epoch path type)) {
1596 0 0       0 if (exists $contopt->{$allow}) {
1597 0         0 $seen_allowed++;
1598 0         0 my $v = $contopt->{$allow};
1599 0         0 @$re = grep { $_->{$allow} eq $v } @$re;
  0         0  
1600             }
1601             }
1602 0 0       0 if (keys %$contopt > $seen_allowed) {
1603 0         0 require Carp;
1604 0         0 Carp::confess
1605             (sprintf "unknown query: %s", join ", ", %$contopt);
1606             }
1607             }
1608 2005 50 33     5195 if ($options->{max} && @$re > $options->{max}) {
1609 0         0 @$re = splice @$re, 0, $options->{max};
1610             }
1611 2005         17720 $re;
1612             }
1613              
1614             sub _recent_events_protocol_x {
1615 6487     6487   14081 my($self,
1616             $data,
1617             $rfile_or_tempfile,
1618             ) = @_;
1619 6487         33034 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1620             # we may be reading meta for the first time
1621 6487         9265 while (my($k,$v) = each %{$data->{meta}}) {
  68455         520616  
1622 61968 100       140483 if ($k ne lc $k){ # "Producers"
1623 6487         20118 $self->{ORIG}{$k} = $v;
1624 6487         15367 next;
1625             }
1626 55481 100       222693 next if defined $self->$k;
1627 10460         65039 $self->$k($v);
1628             }
1629 6487         21391 my $re = $self->$meth ($data);
1630 6487         10567 my $minmax;
1631 6487 50       132128 if (my @stat = stat $rfile_or_tempfile) {
1632 6487         87134 $minmax = { mtime => $stat[9] };
1633             } else {
1634             # defensive because ABH encountered:
1635              
1636             #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1637             #### Ydr_.yaml ... DONE
1638             #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1639             #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1640             #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1641             #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1642             #### gone already at cpan-pause.pl line 0
1643            
1644 0         0 my $LFH = $self->_logfilehandle;
1645 0         0 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1646             }
1647 6487 50       21760 if (@$re) {
1648 6487         16518 $minmax->{min} = $re->[-1]{epoch};
1649 6487         14776 $minmax->{max} = $re->[0]{epoch};
1650             }
1651 6487         28832 $self->minmax ( $minmax );
1652 6487         43564 return $re;
1653             }
1654              
1655             sub _try_deserialize {
1656 6487     6487   13364 my($self,
1657             $suffix,
1658             $rfile_or_tempfile,
1659             ) = @_;
1660 6487 50       22047 if ($suffix eq ".yaml") {
    0          
1661 6487         46459 require YAML::Syck;
1662 6487         24977 YAML::Syck::LoadFile($rfile_or_tempfile);
1663             } elsif ($HAVE->{"Data::Serializer"}) {
1664             my $serializer = Data::Serializer->new
1665 0         0 ( serializer => $serializers{$suffix} );
1666             my $serialized = do
1667 0         0 {
1668 0 0       0 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1669 0         0 local $/;
1670 0         0 <$fh>;
1671             };
1672 0         0 $serializer->raw_deserialize($serialized);
1673             } else {
1674 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1675             }
1676             }
1677              
1678             sub _refresh_internals {
1679 41     41   852 my($self, $dst) = @_;
1680 41         2696 my $class = ref $self;
1681 41         4039 my $rfpeek = $class->new_from_file ($dst);
1682 41         429 for my $acc (qw(
1683             _merged
1684             minmax
1685             )) {
1686 82         1393 $self->$acc ( $rfpeek->$acc );
1687             }
1688 41         892 my $old_dirtymark = $self->dirtymark;
1689 41         504 my $new_dirtymark = $rfpeek->dirtymark;
1690 41 100 66     2239 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
      100        
1691 5         70 $self->done->reset;
1692 5         46 $self->dirtymark ( $new_dirtymark );
1693 5         59 $self->_uptodateness_ever_reached(0);
1694 5         69 $self->seed;
1695             }
1696             }
1697              
1698             =head2 $ret = $obj->rfilename
1699              
1700             Just the basename of our I, composed from C,
1701             a dash, C, and C. E.g. C
1702              
1703             =cut
1704              
1705             sub rfilename {
1706 6682     6682 1 33273 my($self) = @_;
1707 6682         19288 my $file = sprintf("%s-%s%s",
1708             $self->filenameroot,
1709             $self->interval,
1710             $self->serializer_suffix,
1711             );
1712 6682         118061 return $file;
1713             }
1714              
1715             =head2 $str = $self->remote_dir
1716              
1717             The directory we are mirroring from.
1718              
1719             =cut
1720              
1721             sub remote_dir {
1722 15     15 1 85 my($self, $set) = @_;
1723 15 100       55 if (defined $set) {
1724 5         35 $self->_remote_dir ($set);
1725             }
1726 15         75 my $x = $self->_remote_dir;
1727 15         95 $self->is_slave (1);
1728 15         100 return $x;
1729             }
1730              
1731             =head2 $str = $obj->remoteroot
1732              
1733             =head2 (void) $obj->remoteroot ( $set )
1734              
1735             Get/Set the composed prefix needed when rsyncing from a remote module.
1736             If remote_host, remote_module, and remote_dir are set, it is composed
1737             from these.
1738              
1739             =cut
1740              
1741             sub remoteroot {
1742 83     83 1 288 my($self, $set) = @_;
1743 83 100       539 if (defined $set) {
1744 14         88 $self->_remoteroot($set);
1745             }
1746 83         709 my $remoteroot = $self->_remoteroot;
1747 83 100       973 unless (defined $remoteroot) {
1748 5 50       35 $remoteroot = sprintf
    50          
    50          
1749             (
1750             "%s%s%s",
1751             defined $self->remote_host ? ($self->remote_host."::") : "",
1752             defined $self->remote_module ? ($self->remote_module."/") : "",
1753             defined $self->remote_dir ? $self->remote_dir : "",
1754             );
1755 5         20 $self->_remoteroot($remoteroot);
1756             }
1757 83         739 return $remoteroot;
1758             }
1759              
1760             =head2 (void) $obj->split_rfilename ( $recentfilename )
1761              
1762             Inverse method to C. C<$recentfilename> is a plain filename
1763             of the pattern
1764              
1765             $filenameroot-$interval$serializer_suffix
1766              
1767             e.g.
1768              
1769             RECENT-1M.yaml
1770              
1771             This filename is split into its parts and the parts are fed to the
1772             object itself.
1773              
1774             =cut
1775              
1776             sub split_rfilename {
1777 5     5 1 15 my($self, $rfname) = @_;
1778 5         20 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1779 5 50       70 if (my($f,$i,$s) = $rfname =~ $splitter) {
1780 5         20 $self->filenameroot ($f);
1781 5         45 $self->interval ($i);
1782 5         15 $self->serializer_suffix ($s);
1783             } else {
1784 0         0 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1785             }
1786 5         35 return;
1787             }
1788              
1789             =head2 my $rfile = $obj->rfile
1790              
1791             Returns the full path of the I
1792              
1793             =cut
1794              
1795             sub rfile {
1796 16106     16106 1 22049 my($self) = @_;
1797 16106         40469 my $rfile = $self->_rfile;
1798 16106 100       101130 return $rfile if defined $rfile;
1799 4980         11278 $rfile = File::Spec->catfile
1800             ($self->localroot,
1801             $self->rfilename,
1802             );
1803 4980         20870 $self->_rfile ($rfile);
1804 4980         26569 return $rfile;
1805             }
1806              
1807             =head2 $rsync_obj = $obj->rsync
1808              
1809             The File::Rsync object that this object uses for communicating with an
1810             upstream server.
1811              
1812             =cut
1813              
1814             sub rsync {
1815 69     69 1 198 my($self) = @_;
1816 69         559 my $rsync = $self->_rsync;
1817 69 100       623 unless (defined $rsync) {
1818 26   50     160 my $rsync_options = $self->rsync_options || {};
1819 26 50       414 if ($HAVE->{"File::Rsync"}) {
1820 26         744 $rsync = File::Rsync->new($rsync_options);
1821 26         118542 $self->_rsync($rsync);
1822             } else {
1823 0         0 die "File::Rsync required for rsync operations. Cannot continue";
1824             }
1825             }
1826 69         865 return $rsync;
1827             }
1828              
1829             =head2 (void) $obj->register_rsync_error(@err)
1830              
1831             =head2 (void) $obj->un_register_rsync_error()
1832              
1833             Register_rsync_error is called whenever the File::Rsync object fails
1834             on an exec (say, connection doesn't succeed). It issues a warning and
1835             sleeps for an increasing amount of time. Un_register_rsync_error
1836             resets the error count. See also accessor C.
1837              
1838             =cut
1839              
1840             {
1841             my $no_success_count = 0;
1842             my $no_success_time = 0;
1843             sub register_rsync_error {
1844 0     0 1 0 my($self, @err) = @_;
1845 0         0 chomp @err;
1846 0         0 $no_success_time = time;
1847 0         0 $no_success_count++;
1848 0         0 my $max_rsync_errors = $self->max_rsync_errors;
1849 0 0       0 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1850 0 0 0     0 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1851 0         0 require Carp;
1852 0         0 Carp::confess
1853             (
1854             sprintf
1855             (
1856             "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1857             $self->interval,
1858             join(" ",@err),
1859             $no_success_count,
1860             ));
1861             }
1862 0         0 my $sleep = 12 * $no_success_count;
1863 0 0       0 $sleep = 300 if $sleep > 300;
1864 0         0 require Carp;
1865 0         0 Carp::cluck
1866             (sprintf
1867             (
1868             "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1869             scalar(localtime($no_success_time)),
1870             $self->interval,
1871             join(" ",@err),
1872             $sleep,
1873             ));
1874 0         0 sleep $sleep
1875             }
1876             sub un_register_rsync_error {
1877 69     69 1 390 my($self) = @_;
1878 69         331 $no_success_time = 0;
1879 69         2278 $no_success_count = 0;
1880             }
1881             }
1882              
1883             =head2 $clone = $obj->_sparse_clone
1884              
1885             Clones just as much from itself that it does not hurt. Experimental
1886             method.
1887              
1888             Note: what fits better: sparse or shallow? Other suggestions?
1889              
1890             =cut
1891              
1892             sub _sparse_clone {
1893 3226     3226   5184 my($self) = @_;
1894 3226         13309 my $new = bless {}, ref $self;
1895 3226         7740 for my $m (qw(
1896             _interval
1897             _localroot
1898             _remoteroot
1899             _rfile
1900             _use_tempfile
1901             aggregator
1902             filenameroot
1903             ignore_link_stat_errors
1904             is_slave
1905             max_files_per_connection
1906             protocol
1907             rsync_options
1908             serializer_suffix
1909             sleep_per_connection
1910             tempdir
1911             verbose
1912             )) {
1913 51616         286552 my $o = $self->$m;
1914 51616 100       361568 $o = Storable::dclone $o if ref $o;
1915 51616         123343 $new->$m($o);
1916             }
1917 3226         22418 $new;
1918             }
1919              
1920             =head2 $boolean = OBJ->ttl_reached ()
1921              
1922             =cut
1923              
1924             sub ttl_reached {
1925 43     43 1 144 my($self) = @_;
1926 43   100     289 my $have_mirrored = $self->have_mirrored || 0;
1927 43         729 my $now = Time::HiRes::time;
1928 43         430 my $ttl = $self->ttl;
1929 43 50       495 $ttl = 24.2 unless defined $ttl;
1930 43 100       549 if ($now > $have_mirrored + $ttl) {
1931 10         97 return 1;
1932             }
1933 33         332 return 0;
1934             }
1935              
1936             =head2 (void) $obj->unlock()
1937              
1938             Unlocking is implemented with an C on a locking directory
1939             (C<.lock> appended to $rfile).
1940              
1941             =cut
1942              
1943             sub unlock {
1944 8286     8286 1 20485 my($self) = @_;
1945 8286 100       37095 return unless $self->_is_locked;
1946 3088         17675 my $rfile = $self->rfile;
1947 3088 50       203410 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1948 3088 50       271689 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1949 3088         11140 $self->_is_locked (0);
1950             }
1951              
1952             =head2 unseed
1953              
1954             Sets this recentfile in the state of not 'seeded'.
1955              
1956             =cut
1957             sub unseed {
1958 67     67 1 256 my($self) = @_;
1959 67         761 $self->seeded(0);
1960             }
1961              
1962             =head2 $ret = $obj->update ($path, $type)
1963              
1964             =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1965              
1966             =head2 $ret = $obj->update ()
1967              
1968             Enter one file into the local I. $path is the (usually
1969             absolute) path. If the path is outside I tree, then it is
1970             ignored.
1971              
1972             C<$type> is one of C or C.
1973              
1974             Events of type C may set $dirty_epoch. $dirty_epoch is normally
1975             not used and the epoch is calculated by the update() routine itself
1976             based on current time. But if there is the demand to insert a
1977             not-so-current file into the dataset, then the caller sets
1978             $dirty_epoch. This causes the epoch of the registered event to become
1979             $dirty_epoch or -- if the exact value given is already taken -- a tiny
1980             bit more. As compensation the dirtymark of the whole dataset is set to
1981             now or the current epoch, whichever is higher. Note: setting the
1982             dirty_epoch to the future is prohibited as it's very unlikely to be
1983             intended: it definitely might wreak havoc with the index files.
1984              
1985             The new file event is unshifted (or, if dirty_epoch is set, inserted
1986             at the place it belongs to, according to the rule to have a sequence
1987             of strictly decreasing timestamps) to the array of recent_events and
1988             the array is shortened to the length of the timespan allowed. This is
1989             usually the timespan specified by the interval of this recentfile but
1990             as long as this recentfile has not been merged to another one, the
1991             timespan may grow without bounds.
1992              
1993             The third form runs an update without inserting a new file. This may
1994             be desired to truncate a recentfile.
1995              
1996             =cut
1997             sub _epoch_monotonically_increasing {
1998 1614     1614   2837 my($self,$epoch,$recent) = @_;
1999 1614 100       4382 return $epoch unless @$recent; # the first one goes unoffended
2000 1579 100       25603 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
2001 1414         3590 return $epoch;
2002             } else {
2003 165         580 return _increase_a_bit($recent->[0]{epoch});
2004             }
2005             }
2006             sub update {
2007 1646     1646 1 416069 my($self,$path,$type,$dirty_epoch) = @_;
2008 1646 50 66     10715 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2009 1292 50       4457 die "update called without path argument" unless defined $path;
2010 1292 50       3983 die "update called without type argument" unless defined $type;
2011 1292 50       17371 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2012             }
2013 1646         7053 $self->lock;
2014 1646         31416 my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
2015 1646 100       11946 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2016 1646         6776 $self->_assert_symlink;
2017 1646         7746 $self->unlock;
2018             }
2019              
2020             =head2 $obj->batch_update($batch)
2021              
2022             Like update but for many files. $batch is an arrayref containing
2023             hashrefs with the structure
2024              
2025             {
2026             path => $path,
2027             type => $type,
2028             epoch => $epoch,
2029             }
2030              
2031              
2032              
2033             =cut
2034             sub batch_update {
2035 0     0 1 0 my($self,$batch) = @_;
2036 0         0 $self->lock;
2037 0         0 my $ctx = $self->_locked_batch_update($batch);
2038 0 0       0 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2039 0         0 $self->_assert_symlink;
2040 0         0 $self->unlock;
2041             }
2042             sub _locked_batch_update {
2043 1646     1646   3619 my($self,$batch) = @_;
2044 1646         3300 my $something_done = 0;
2045 1646         21394 my $recent = $self->recent_events;
2046 1646 100       5132 unless ($recent->[0]) {
2047             # obstetrics
2048 35         50 $something_done = 1;
2049             }
2050 1646         4342 my %paths_in_recent = map { $_->{path} => undef } @$recent;
  59697         154082  
2051 1646         10521 my $interval = $self->interval;
2052 1646         4798 my $canonmeth = $self->canonize;
2053 1646 100       9219 unless ($canonmeth) {
2054 390         620 $canonmeth = "naive_path_normalize";
2055             }
2056 1646         2808 my $oldest_allowed = 0;
2057 1646         2941 my $setting_new_dirty_mark = 0;
2058 1646         2589 my $console;
2059 1646 50 66     6065 if ($self->verbose && @$batch > 1) {
2060 0         0 eval {require Time::Progress};
  0         0  
2061 0 0       0 warn "dollarat[$@]" if $@;
2062 0         0 $| = 1;
2063 0         0 $console = new Time::Progress;
2064 0         0 $console->attr( min => 1, max => scalar @$batch );
2065 0         0 print "\n";
2066             }
2067 1646         9832 my $i = 0;
2068 1646         2272 my $memo_splicepos;
2069 1646   0     6883 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
  0   0     0  
2070 1646         3289 $i++;
2071 1646 50 33     4586 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2072 1646         7121 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
2073 1646         3962 $something_done = $ctx->{something_done};
2074 1646         2749 $oldest_allowed = $ctx->{oldest_allowed};
2075 1646         3211 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
2076 1646         3784 $recent = $ctx->{recent};
2077 1646         9006 $memo_splicepos = $ctx->{memo_splicepos};
2078             }
2079 1646 50       4772 print "\n" if $console;
2080 1646 100       3745 if ($setting_new_dirty_mark) {
2081 32         59 $oldest_allowed = 0;
2082             }
2083 1646         3727 TRUNCATE: while (@$recent) {
2084 2650 100       9964 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
2085 1004         1829 pop @$recent;
2086 1004         3997 $something_done = 1;
2087             } else {
2088 1646         4217 last TRUNCATE;
2089             }
2090             }
2091 1646         20189 return {something_done=>$something_done,recent=>$recent};
2092             }
2093             sub _update_batch_item {
2094 1646     1646   4992 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2095 1646         3251 my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
  1646         6072  
2096 1646 50 66     8531 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2097 1292         4339 $path = $self->$canonmeth($path);
2098             }
2099             # you must calculate the time after having locked, of course
2100 1646         5817 my $now = Time::HiRes::time;
2101              
2102 1646         2199 my $epoch;
2103 1646 100 66     5222 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
2104 32         55 $epoch = $dirty_epoch;
2105             } else {
2106 1614         5738 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2107             }
2108 1646   50     5383 $recent ||= [];
2109 1646         4477 my $merged = $self->merged;
2110 1646 100 66     10872 if ($merged->{epoch} && !$setting_new_dirty_mark) {
2111 877         4663 my $virtualnow = _bigfloatmax($now,$epoch);
2112             # for the lower bound I think we need no big math, we calc already
2113 877         3399 my $secs = $self->interval_secs();
2114 877         8080 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
2115             } else {
2116             # as long as we are not merged at all, no limits!
2117             }
2118 1646         5500 my $lrd = $self->localroot;
2119 1646 100 66     26824 if (defined $path && $path =~ s|^\Q$lrd\E||) {
2120 1292         4971 $path =~ s|^/||;
2121 1292         2278 my $splicepos;
2122             # remove the older duplicates of this $path, irrespective of $type:
2123 1292 100       2995 if (defined $dirty_epoch) {
2124 32         153 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2125 32         86 $recent = $ctx->{recent};
2126 32         69 $splicepos = $ctx->{splicepos};
2127 32         66 $epoch = $ctx->{epoch};
2128 32         98 my $dirtymark = $self->dirtymark;
2129 32         152 my $new_dm = $now;
2130 32 50       91 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
2131 0         0 $new_dm = $epoch;
2132             }
2133 32         113 $self->dirtymark($new_dm);
2134 32         127 $setting_new_dirty_mark = 1;
2135 32 50 33     192 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
2136 32         88 $self->merged(+{});
2137             }
2138             } else {
2139 1260         2514 $recent = [ grep { $_->{path} ne $path } @$recent ];
  45916         100717  
2140 1260         2570 $splicepos = 0;
2141             }
2142 1292 50       4126 if (defined $splicepos) {
2143 1292         9745 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2144 1292         4241 $paths_in_recent->{$path} = undef;
2145             }
2146 1292         2953 $memo_splicepos = $splicepos;
2147 1292         2334 $something_done = 1;
2148             }
2149             return
2150             {
2151 1646         14198 something_done => $something_done,
2152             oldest_allowed => $oldest_allowed,
2153             setting_new_dirty_mark => $setting_new_dirty_mark,
2154             recent => $recent,
2155             memo_splicepos => $memo_splicepos,
2156             }
2157             }
2158             sub _update_with_dirty_epoch {
2159 32     32   78 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2160 32         59 my $splicepos;
2161 32         71 my $new_recent = [];
2162 32 50       147 if (exists $paths_in_recent->{$path}) {
2163 0         0 my $cancel = 0;
2164 0         0 KNOWN_EVENT: for my $i (0..$#$recent) {
2165 0 0       0 if ($recent->[$i]{path} eq $path) {
2166 0 0       0 if ($recent->[$i]{epoch} eq $epoch) {
2167             # nothing to do
2168 0         0 $cancel = 1;
2169 0         0 last KNOWN_EVENT;
2170             }
2171             } else {
2172 0         0 push @$new_recent, $recent->[$i];
2173             }
2174             }
2175 0 0       0 @$recent = @$new_recent unless $cancel;
2176             }
2177 32 50 33     183 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
    50          
2178 0         0 $splicepos = 0;
2179             } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2180 32         59 $splicepos = @$recent;
2181             } else {
2182 0         0 my $startingpoint;
2183 0 0 0     0 if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2184 0         0 $startingpoint = 0;
2185             } else {
2186 0         0 $startingpoint = $memo_splicepos;
2187             }
2188 0         0 RECENT: for my $i ($startingpoint..$#$recent) {
2189 0         0 my $ev = $recent->[$i];
2190 0 0       0 if ($epoch eq $recent->[$i]{epoch}) {
2191 0 0       0 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2192             }
2193 0 0       0 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2194 0         0 $splicepos = $i;
2195 0         0 last RECENT;
2196             }
2197             }
2198             }
2199             return {
2200 32         311 recent => $recent,
2201             splicepos => $splicepos,
2202             epoch => $epoch,
2203             }
2204             }
2205              
2206             =head2 seed
2207              
2208             Sets this recentfile in the state of 'seeded' which means it has to
2209             re-evaluate its uptodateness.
2210              
2211             =cut
2212             sub seed {
2213 28     28 1 95 my($self) = @_;
2214 28         194 $self->seeded(1);
2215             }
2216              
2217             =head2 seeded
2218              
2219             Tells if the recentfile is in the state 'seeded'.
2220              
2221             =cut
2222             sub seeded {
2223 134     134 1 781 my($self, $set) = @_;
2224 134 100       951 if (defined $set) {
2225 95         651 $self->_seeded ($set);
2226             }
2227 134         1129 my $x = $self->_seeded;
2228 134 100       1225 unless (defined $x) {
2229 8         29 $x = 0;
2230 8         47 $self->_seeded ($x);
2231             }
2232 134         1066 return $x;
2233             }
2234              
2235             =head2 uptodate
2236              
2237             True if this object has mirrored the complete interval covered by the
2238             current recentfile.
2239              
2240             =cut
2241             sub uptodate {
2242 56     56 1 192 my($self) = @_;
2243 56         183 my $uptodate;
2244             my $why;
2245 56 100 66     404 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2246 19         119 $why = "saturated";
2247 19         74 $uptodate = 1;
2248             }
2249             # it's too easy to misconfigure ttl and related timings and then
2250             # never reach uptodateness, so disabled 2009-03-22
2251 56         475 if (0 and not defined $uptodate) {
2252             if ($self->ttl_reached){
2253             $why = "ttl_reached returned true, so we are not uptodate";
2254             $uptodate = 0 ;
2255             }
2256             }
2257 56 100       398 unless (defined $uptodate) {
2258             # look if recentfile has unchanged timestamp
2259 37         253 my $minmax = $self->minmax;
2260 37 100       487 if (exists $minmax->{mtime}) {
2261 21         235 my $rfile = $self->_my_current_rfile;
2262 21         674 my @stat = stat $rfile;
2263 21 50       166 if (@stat) {
2264 21         109 my $mtime = $stat[9];
2265 21 50 33     934 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
      33        
2266 0         0 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2267 0         0 $uptodate = 0;
2268             } else {
2269 21         144 my $covered = $self->done->covered(@$minmax{qw(max min)});
2270 21 50       446 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2271 21         140 $uptodate = $covered;
2272             }
2273             } else {
2274 0         0 require Carp;
2275 0         0 $why = "Could not stat '$rfile': $!";
2276 0         0 Carp::cluck($why);
2277 0         0 $uptodate = 0;
2278             }
2279             }
2280             }
2281 56 100       307 unless (defined $uptodate) {
2282 16         45 $why = "fallthrough, so not uptodate";
2283 16         52 $uptodate = 0;
2284             }
2285 56 100       253 if ($uptodate) {
2286 34         177 $self->_uptodateness_ever_reached(1);
2287             }
2288 56         565 my $remember =
2289             {
2290             uptodate => $uptodate,
2291             why => $why,
2292             };
2293 56         447 $self->_remember_last_uptodate_call($remember);
2294 56         957 return $uptodate;
2295             }
2296              
2297             =head2 $obj->write_recent ($recent_files_arrayref)
2298              
2299             Writes a I based on the current reflection of the current
2300             state of the tree limited by the current interval.
2301              
2302             =cut
2303             sub _resort {
2304 0     0   0 my($self) = @_;
2305 0         0 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
  0         0  
  0         0  
  0         0  
2306 0         0 return;
2307             }
2308             sub write_recent {
2309 2717     2717 1 375044 my ($self,$recent) = @_;
2310 2717 50       5695 die "write_recent called without argument" unless defined $recent;
2311 2717         3394 my $Last_epoch;
2312 2717         9100 SANITYCHECK: for my $i (0..$#$recent) {
2313 105193 50 66     365654 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2314 0         0 require Carp;
2315             Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2316 0         0 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2317             # you may want to:
2318             # $self->_resort($recent);
2319             # last SANITYCHECK;
2320             }
2321 105193         220918 $Last_epoch = $recent->[$i]{epoch};
2322             }
2323 2717         9939 my $minmax = $self->minmax;
2324 2717 100 100     21517 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2325 1668 50 33     9682 $minmax->{max} = @$recent && exists $recent->[0]{epoch} ? $recent->[0]{epoch} : undef;
2326             }
2327 2717 100 100     12551 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2328 568 50 33     3140 $minmax->{min} = @$recent && exists $recent->[-1]{epoch} ? $recent->[-1]{epoch} : undef;
2329             }
2330 2717         8125 $self->minmax($minmax);
2331 2717         14601 my $meth = sprintf "write_%d", $self->protocol;
2332 2717         24211 $self->$meth($recent);
2333             }
2334              
2335             =head2 $obj->write_0 ($recent_files_arrayref)
2336              
2337             Delegate of C on protocol 0
2338              
2339             =cut
2340              
2341             sub write_0 {
2342 0     0 1 0 my ($self,$recent) = @_;
2343 0         0 my $rfile = $self->rfile;
2344 0         0 YAML::Syck::DumpFile("$rfile.new",$recent);
2345 0 0       0 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2346             }
2347              
2348             =head2 $obj->write_1 ($recent_files_arrayref)
2349              
2350             Delegate of C on protocol 1
2351              
2352             =cut
2353              
2354             sub write_1 {
2355 2717     2717 1 4543 my ($self,$recent) = @_;
2356 2717         6214 my $rfile = $self->rfile;
2357 2717         7044 my $suffix = $self->serializer_suffix;
2358 2717         31782 my $data = {
2359             meta => $self->meta_data,
2360             recent => $recent,
2361             };
2362 2717         5197 my $serialized;
2363 2717 100       7745 if ($suffix eq ".yaml") {
    50          
2364 2702         11141 $serialized = YAML::Syck::Dump($data);
2365             } elsif ($HAVE->{"Data::Serializer"}) {
2366             my $serializer = Data::Serializer->new
2367 15         110 ( serializer => $serializers{$suffix} );
2368 15         15905 $serialized = $serializer->raw_serialize($data);
2369             } else {
2370 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2371             }
2372 2717 50       1506219 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2373 2717         82842 print $fh $serialized;
2374 2717 50       136043 close $fh or die "Could not close '$rfile.new': $!";
2375 2717 50       416589 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2376             }
2377              
2378             BEGIN {
2379 8     8   93642 my $nq = qr/[^"]+/; # non-quotes
2380 8         73 my @pod_lines =
2381 8         21 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
  32         335  
  32         845  
  136         361  
2382              
2383             =head1 SERIALIZERS
2384              
2385             The following suffixes are supported and trigger the use of these
2386             serializers:
2387              
2388             =over 4
2389              
2390             =item C<< ".yaml" => "YAML::Syck" >>
2391              
2392             =item C<< ".json" => "JSON" >>
2393              
2394             =item C<< ".sto" => "Storable" >>
2395              
2396             =item C<< ".dd" => "Data::Dumper" >>
2397              
2398             =back
2399              
2400             =cut
2401              
2402             BEGIN {
2403 8     8   96 my @pod_lines =
2404 8         20 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
  64         2493  
  240         515  
2405              
2406             =head1 INTERVAL SPEC
2407              
2408             An interval spec is a primitive way to express time spans. Normally it
2409             is composed from an integer and a letter.
2410              
2411             As a special case, a string that consists only of the single letter
2412             C, stands for MAX_INT seconds.
2413              
2414             The following letters express the specified number of seconds:
2415              
2416             =over 4
2417              
2418             =item C<< s => 1 >>
2419              
2420             =item C<< m => 60 >>
2421              
2422             =item C<< h => 60*60 >>
2423              
2424             =item C<< d => 60*60*24 >>
2425              
2426             =item C<< W => 60*60*24*7 >>
2427              
2428             =item C<< M => 60*60*24*30 >>
2429              
2430             =item C<< Q => 60*60*24*90 >>
2431              
2432             =item C<< Y => 60*60*24*365.25 >>
2433              
2434             =back
2435              
2436             =cut
2437              
2438             =head1 SEE ALSO
2439              
2440             L,
2441             L,
2442             L
2443              
2444             =head1 BUGS
2445              
2446             Please report any bugs or feature requests through the web interface
2447             at
2448             L.
2449             I will be notified, and then you'll automatically be notified of
2450             progress on your bug as I make changes.
2451              
2452             =head1 KNOWN BUGS
2453              
2454             Memory hungry: it seems all memory is allocated during the initial
2455             rsync where a list of all files is maintained in memory.
2456              
2457             =head1 SUPPORT
2458              
2459             You can find documentation for this module with the perldoc command.
2460              
2461             perldoc File::Rsync::Mirror::Recentfile
2462              
2463             You can also look for information at:
2464              
2465             =over 4
2466              
2467             =item * RT: CPAN's request tracker
2468              
2469             L
2470              
2471             =item * AnnoCPAN: Annotated CPAN documentation
2472              
2473             L
2474              
2475             =item * CPAN Ratings
2476              
2477             L
2478              
2479             =item * Search CPAN
2480              
2481             L
2482              
2483             =back
2484              
2485              
2486             =head1 ACKNOWLEDGEMENTS
2487              
2488             Thanks to RJBS for module-starter.
2489              
2490             =head1 AUTHOR
2491              
2492             Andreas König
2493              
2494             =head1 COPYRIGHT & LICENSE
2495              
2496             Copyright 2008,2009 Andreas König.
2497              
2498             This program is free software; you can redistribute it and/or modify it
2499             under the same terms as Perl itself.
2500              
2501              
2502             =cut
2503              
2504             1; # End of File::Rsync::Mirror::Recentfile
2505              
2506             # Local Variables:
2507             # mode: cperl
2508             # cperl-indent-level: 4
2509             # End: