File Coverage

blib/lib/File/Rsync/Mirror/Recentfile.pm
Criterion Covered Total %
statement 778 1008 77.1
branch 293 484 60.5
condition 108 184 58.7
subroutine 80 87 91.9
pod 39 39 100.0
total 1298 1802 72.0


line stmt bran cond sub pod time code
1             package File::Rsync::Mirror::Recentfile;
2              
3             # use warnings;
4 9     9   24372 use strict;
  9         11  
  9         475  
5              
6             =encoding utf-8
7              
8             =head1 NAME
9              
10             File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
11              
12             =cut
13              
14             my $HAVE = {};
15             for my $package (
16             "Data::Serializer",
17             "File::Rsync"
18             ) {
19             $HAVE->{$package} = eval qq{ require $package; };
20             }
21 9     9   30 use Config;
  9         10  
  9         319  
22 9     9   28 use File::Basename qw(basename dirname fileparse);
  9         9  
  9         525  
23 9     9   877 use File::Copy qw(cp);
  9         6621  
  9         327  
24 9     9   37 use File::Path qw(mkpath);
  9         3  
  9         301  
25 9     9   2931 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
  9         9  
  9         1418  
26 9     9   5815 use File::Temp;
  9         128831  
  9         641  
27 9     9   68 use List::Util qw(first max min);
  9         11  
  9         928  
28 9     9   49 use Scalar::Util qw(reftype);
  9         16  
  9         336  
29 9     9   4867 use Storable;
  9         21241  
  9         474  
30 9     9   4141 use Time::HiRes qw();
  9         9536  
  9         205  
31 9     9   3665 use YAML::Syck;
  9         13465  
  9         489  
32              
33 9     9   47 use version; our $VERSION = qv('0.0.8');
  9         10  
  9         51  
34              
35 9     9   679 use constant MAX_INT => ~0>>1; # anything better?
  9         11  
  9         418  
36 9     9   34 use constant DEFAULT_PROTOCOL => 1;
  9         11  
  9         5015  
37              
38             # cf. interval_secs
39             my %seconds;
40              
41             # maybe subclass if this mapping is bad?
42             my %serializers;
43              
44             =head1 SYNOPSIS
45              
46             Writer (of a single file):
47              
48             use File::Rsync::Mirror::Recentfile;
49             my $fr = File::Rsync::Mirror::Recentfile->new
50             (
51             interval => q(6h),
52             filenameroot => "RECENT",
53             comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54             localroot => "/home/ftp/pub/PAUSE/authors/",
55             aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
56             );
57             $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
58              
59             Reader/mirrorer:
60              
61             my $rf = File::Rsync::Mirror::Recentfile->new
62             (
63             filenameroot => "RECENT",
64             interval => q(6h),
65             localroot => "/home/ftp/pub/PAUSE/authors",
66             remote_dir => "",
67             remote_host => "pause.perl.org",
68             remote_module => "authors",
69             rsync_options => {
70             compress => 1,
71             'rsync-path' => '/usr/bin/rsync',
72             links => 1,
73             times => 1,
74             'omit-dir-times' => 1,
75             checksum => 1,
76             },
77             verbose => 1,
78             );
79             $rf->mirror;
80              
81             Aggregator (usually the writer):
82              
83             my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
84             $rf->aggregate;
85              
86             =head1 DESCRIPTION
87              
88             Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89             is always composed of several recentfiles, controlled by the
90             F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91             for a single timeslice.
92              
93             =head1 EXPORT
94              
95             No exports.
96              
97             =head1 CONSTRUCTORS / DESTRUCTOR
98              
99             =head2 my $obj = CLASS->new(%hash)
100              
101             Constructor. On every argument pair the key is a method name and the
102             value is an argument to that method name.
103              
104             If a recentfile for this resource already exists, metadata that are
105             not defined by the constructor will be fetched from there as soon as
106             it is being read by recent_events().
107              
108             =cut
109              
110             sub new {
111 650     650 1 354317796 my($class, @args) = @_;
112 650         2386 my $self = bless {}, $class;
113 650         3880 while (@args) {
114 2040         7062 my($method,$arg) = splice @args, 0, 2;
115 2040         7434 $self->$method($arg);
116             }
117 650 50       4814 unless (defined $self->protocol) {
118 650         3421 $self->protocol(DEFAULT_PROTOCOL);
119             }
120 650 100       4486 unless (defined $self->filenameroot) {
121 620         3735 $self->filenameroot("RECENT");
122             }
123 650 100       3735 unless (defined $self->serializer_suffix) {
124 626         3524 $self->serializer_suffix(".yaml");
125             }
126 650         3254 return $self;
127             }
128              
129             =head2 my $obj = CLASS->new_from_file($file)
130              
131             Constructor. $file is a I.
132              
133             =cut
134              
135             sub new_from_file {
136 1323     1323 1 456766 my($class, $file) = @_;
137 1323         2788 my $self = bless {}, $class;
138 1323         4114 $self->_rfile($file);
139             #?# $self->lock;
140 1323 50       5621 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
  1323         40124  
141 1323         5585 local $/;
142 1323         35032 <$fh>;
143             };
144             # XXX: we can skip this step when the metadata are sufficient, but
145             # we cannot parse the file without some magic stuff about
146             # serialized formats
147 1323         15770 while (-l $file) {
148 37         1113 my($name,$path) = fileparse $file;
149 37         252 my $symlink = readlink $file;
150 37 50       142 if ($symlink =~ m|/|) {
151 0         0 die "FIXME: filenames containing '/' not supported, got $symlink";
152             }
153 37         748 $file = File::Spec->catfile ( $path, $symlink );
154             }
155 1323         88755 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 1323         5350 $self->serializer_suffix($suffix);
157 1323         7481 $self->localroot($path);
158 1323 50       5635 die "Could not determine file format from suffix" unless $suffix;
159 1323         1380 my $deserialized;
160 1323 50       3149 if ($suffix eq ".yaml") {
    0          
161 1323         8003 require YAML::Syck;
162 1323         4100 $deserialized = YAML::Syck::LoadFile($file);
163             } elsif ($HAVE->{"Data::Serializer"}) {
164             my $serializer = Data::Serializer->new
165 0         0 ( serializer => $serializers{$suffix} );
166 0         0 $deserialized = $serializer->raw_deserialize($serialized);
167             } else {
168 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
169             }
170 1323         724601 while (my($k,$v) = each %{$deserialized->{meta}}) {
  15663         63022  
171 14340 100       22789 next if $k ne lc $k; # "Producers"
172 13017         25785 $self->$k($v);
173             }
174 1323 50       3666 unless (defined $self->protocol) {
175 0         0 $self->protocol(DEFAULT_PROTOCOL);
176             }
177 1323         32679 return $self;
178             }
179              
180             =head2 DESTROY
181              
182             A simple unlock.
183              
184             =cut
185             sub DESTROY {
186 6254     6254   120707943 my $self = shift;
187 6254         13090 $self->unlock;
188 6254 100       29864 unless ($self->_current_tempfile_fh) {
189 6250 100       23375 if (my $tempfile = $self->_current_tempfile) {
190 156 100       9648 if (-e $tempfile) {
191             # unlink $tempfile; # may fail in global destruction
192             }
193             }
194             }
195             }
196              
197             =head1 ACCESSORS
198              
199             =cut
200              
201             my @accessors;
202              
203             BEGIN {
204 9     9   41 @accessors = (
205             "_current_tempfile",
206             "_current_tempfile_fh",
207             "_delayed_operations",
208             "_done",
209             "_interval",
210             "_is_locked",
211             "_localroot",
212             "_merged",
213             "_pathdb",
214             "_remember_last_uptodate_call",
215             "_remote_dir",
216             "_remoteroot",
217             "_requires_fsck",
218             "_rfile",
219             "_rsync",
220             "__verified_tempdir",
221             "_seeded",
222             "_uptodateness_ever_reached",
223             "_use_tempfile",
224             );
225              
226 9         310 my @pod_lines =
227 9         20 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
  1296         1467  
228              
229             =over 4
230              
231             =item aggregator
232              
233             A list of interval specs that tell the aggregator which Is
234             are to be produced.
235              
236             =item canonize
237              
238             The name of a method to canonize the path before rsyncing. Only
239             supported value is C. Defaults to that.
240              
241             =item comment
242              
243             A comment about this tree and setup.
244              
245             =item dirtymark
246              
247             A timestamp. The dirtymark is updated whenever an out of band change
248             on the origin server is performed that violates the protocol. Say,
249             they add or remove files in the middle somewhere. Slaves must react
250             with a devaluation of their C structure which then leads to a
251             full re-sync of all files. Implementation note: dirtymark may increase
252             or decrease.
253              
254             =item filenameroot
255              
256             The (prefix of the) filename we use for this I. Defaults to
257             C. The string must not contain a directory separator.
258              
259             =item have_mirrored
260              
261             Timestamp remembering when we mirrored this recentfile the last time.
262             Only relevant for slaves.
263              
264             =item ignore_link_stat_errors
265              
266             If set to true, rsync errors are ignored that complain about link stat
267             errors. These seem to happen only when there are files missing at the
268             origin. In race conditions this can always happen, so it defaults to
269             true.
270              
271             =item is_slave
272              
273             If set to true, this object will fetch a new recentfile from remote
274             when the timespan between the last mirror (see have_mirrored) and now
275             is too large (see C).
276              
277             =item keep_delete_objects_forever
278              
279             The default for delete events is that they are passed through the
280             collection of recentfile objects until they reach the Z file. There
281             they get dropped so that the associated file object ceases to exist at
282             all. By setting C the delete objects are
283             kept forever. This makes the Z file larger but has the advantage that
284             slaves that have interrupted mirroring for a long time still can clean
285             up their copy.
286              
287             =item locktimeout
288              
289             After how many seconds shall we die if we cannot lock a I?
290             Defaults to 600 seconds.
291              
292             =item loopinterval
293              
294             When mirror_loop is called, this accessor can specify how much time
295             every loop shall at least take. If the work of a loop is done before
296             that time has gone, sleeps for the rest of the time. Defaults to
297             arbitrary 42 seconds.
298              
299             =item max_files_per_connection
300              
301             Maximum number of files that are transferred on a single rsync call.
302             Setting it higher means higher performance at the price of holding
303             connections longer and potentially disturbing other users in the pool.
304             Defaults to the arbitrary value 42.
305              
306             =item max_rsync_errors
307              
308             When rsync operations encounter that many errors without any resetting
309             success in between, then we die. Defaults to unlimited. A value of
310             -1 means we run forever ignoring all rsync errors.
311              
312             =item minmax
313              
314             Hashref remembering when we read the recent_events from this file the
315             last time and what the timespan was.
316              
317             =item protocol
318              
319             When the RECENT file format changes, we increment the protocol. We try
320             to support older protocols in later releases.
321              
322             =item remote_host
323              
324             The host we are mirroring from. Leave empty for the local filesystem.
325              
326             =item remote_module
327              
328             Rsync servers have so called modules to separate directory trees from
329             each other. Put here the name of the module under which we are
330             mirroring. Leave empty for local filesystem.
331              
332             =item rsync_options
333              
334             Things like compress, links, times or checksums. Passed in to the
335             File::Rsync object used to run the mirror.
336              
337             =item serializer_suffix
338              
339             Mostly untested accessor. The only well tested format for
340             Is at the moment is YAML. It is used with YAML::Syck via
341             Data::Serializer. But in principle other formats are supported as
342             well. See section SERIALIZERS below.
343              
344             =item sleep_per_connection
345              
346             Sleep that many seconds (floating point OK) after every chunk of rsyncing
347             has finished. Defaults to arbitrary 0.42.
348              
349             =item tempdir
350              
351             Directory to write temporary files to. Must allow rename operations
352             into the tree which usually means it must live on the same partition
353             as the target directory. Defaults to C<< $self->localroot >>.
354              
355             =item ttl
356              
357             Time to live. Number of seconds after which this recentfile must be
358             fetched again from the origin server. Only relevant for slaves.
359             Defaults to arbitrary 24.2 seconds.
360              
361             =item verbose
362              
363             Boolean to turn on a bit verbosity.
364              
365             =item verboselog
366              
367             Path to the logfile to write verbose progress information to. This is
368             a primitive stop gap solution to get simple verbose logging working.
369             Switching to Log4perl or similar is probably the way to go.
370              
371             =back
372              
373             =cut
374              
375 9     9   3625 use accessors @accessors;
  9         6088  
  9         32  
376              
377             =head1 METHODS
378              
379             =head2 (void) $obj->aggregate( %options )
380              
381             Takes all intervals that are collected in the accessor called
382             aggregator. Sorts them by actual length of the interval.
383             Removes those that are shorter than our own interval. Then merges this
384             object into the next larger object. The merging continues upwards
385             as long as the next I is old enough to warrant a merge.
386              
387             If a merge is warranted is decided according to the interval of the
388             previous interval so that larger files are not so often updated as
389             smaller ones. If $options{force} is true, all files get updated.
390              
391             Here is an example to illustrate the behaviour. Given aggregators
392              
393             1h 1d 1W 1M 1Q 1Y Z
394              
395             then
396              
397             1h updates 1d on every call to aggregate()
398             1d updates 1W earliest after 1h
399             1W updates 1M earliest after 1d
400             1M updates 1Q earliest after 1W
401             1Q updates 1Y earliest after 1M
402             1Y updates Z earliest after 1Q
403              
404             Note that all but the smallest recentfile get updated at an arbitrary
405             rate and as such are quite useless on their own.
406              
407             =cut
408              
409             sub aggregate {
410 424     424 1 21699896 my($self, %option) = @_;
411 424         636 my %seen_interval;
412 3512         5642 my @aggs = sort { $a->{secs} <=> $b->{secs} }
413 2240 50       8380 grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
414 2240         5524 map { { interval => $_, secs => $self->interval_secs($_)} }
415 424 50       1938 $self->interval, @{$self->aggregator || []};
  424         1058  
416 424         1254 $self->update;
417 424         10334 $aggs[0]{object} = $self;
418 424         1704 AGGREGATOR: for my $i (0..$#aggs-1) {
419 1180         2168 my $this = $aggs[$i]{object};
420 1180         3028 my $next = $this->_sparse_clone;
421 1180         3926 $next->interval($aggs[$i+1]{interval});
422 1180         1366 my $want_merge = 0;
423 1180 100 100     4820 if ($option{force} || $i == 0) {
424 724         1048 $want_merge = 1;
425             } else {
426 456         1014 my $next_rfile = $next->rfile;
427 456 100       6438 if (-e $next_rfile) {
428 384         1170 my $prev = $aggs[$i-1]{object};
429 384         1824 local $^T = time;
430 384         2802 my $next_age = 86400 * -M $next_rfile;
431 384 100       954 if ($next_age > $prev->interval_secs) {
432 66         150 $want_merge = 1;
433             }
434             } else {
435 72         126 $want_merge = 1;
436             }
437             }
438 1180 100       2210 if ($want_merge) {
439 862         1906 $next->merge($this);
440 862         18890 $aggs[$i+1]{object} = $next;
441             } else {
442 318         1338 last AGGREGATOR;
443             }
444             }
445             }
446              
447             # collect file size and mtime for all files of this aggregate
448             sub _debug_aggregate {
449 36     36   29160 my($self) = @_;
450 324         384 my @aggs = sort { $a->{secs} <=> $b->{secs} }
451 216         474 map { { interval => $_, secs => $self->interval_secs($_)} }
452 36 50       96 $self->interval, @{$self->aggregator || []};
  36         90  
453 36         102 my $report = [];
454 36         120 for my $i (0..$#aggs) {
455 216         8910 my $this = Storable::dclone $self;
456 216         462 $this->interval($aggs[$i]{interval});
457 216         288 my $rfile = $this->rfile;
458 216         2298 my @stat = stat $rfile;
459 216         906 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460             }
461 36         372 $report;
462             }
463              
464             # (void) $self->_assert_symlink()
465             sub _assert_symlink {
466 1973     1973   2769 my($self) = @_;
467 1973         3981 my $recentrecentfile = File::Spec->catfile
468             (
469             $self->localroot,
470             sprintf
471             (
472             "%s.recent",
473             $self->filenameroot
474             )
475             );
476 1973 50       51405 if ($Config{d_symlink} eq "define") {
477 1973         1649 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
478 1973 100       23288 if (-l $recentrecentfile) {
479 1948         9928 my $found_symlink = readlink $recentrecentfile;
480 1948 100       3744 if ($found_symlink eq $self->rfilename) {
481 1930         3316 return;
482             } else {
483 18         36 $howto_create_symlink = 2;
484             }
485             } else {
486 25         32 $howto_create_symlink = 1;
487             }
488 43 100       68 if (1 == $howto_create_symlink) {
489 25 50       51 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
490             } else {
491 18         222 unlink "$recentrecentfile.$$"; # may fail
492 18 50       30 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
493 18 50       594 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494             }
495             } else {
496 0         0 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
497 0         0 unlink "$recentrecentfile.$$"; # may fail
498 0 0       0 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
499 0 0       0 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
500             }
501             }
502              
503             =head2 $hashref = $obj->delayed_operations
504              
505             A hash of hashes containing unlink and rmdir operations which had to
506             wait until the recentfile got unhidden in order to not confuse
507             downstream mirrors (in case we have some).
508              
509             =cut
510              
511             sub delayed_operations {
512 47     47 1 85 my($self) = @_;
513 47         241 my $x = $self->_delayed_operations;
514 47 100       302 unless (defined $x) {
515 16         109 $x = {
516             unlink => {},
517             rmdir => {},
518             };
519 16         67 $self->_delayed_operations ($x);
520             }
521 47         234 return $x;
522             }
523              
524             =head2 $done = $obj->done
525              
526             C<$done> is a reference to a L
527             object that keeps track of rsync activities. Only needed and used when
528             we are a mirroring slave.
529              
530             =cut
531              
532             sub done {
533 125     125 1 585 my($self) = @_;
534 125         790 my $done = $self->_done;
535 125 100       1605 if (!$done) {
536 16         179 require File::Rsync::Mirror::Recentfile::Done;
537 16         285 $done = File::Rsync::Mirror::Recentfile::Done->new();
538 16         72 $done->_rfinterval ($self->interval);
539 16         168 $self->_done ( $done );
540             }
541 125         599 return $done;
542             }
543              
544             =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545              
546             Stores the remote I locally as a tempfile. The caller is
547             responsible to remove the file after use.
548              
549             Note: if you're intending to act as an rsync server for other slaves,
550             then you must prefer this method to fetch that file with
551             get_remotefile(). Otherwise downstream mirrors would expect you to
552             already have mirrored all the files that are in the I
553             before you have them mirrored.
554              
555             =cut
556              
557             sub get_remote_recentfile_as_tempfile {
558 76     76 1 145 my($self) = @_;
559 76         335 mkpath $self->localroot;
560 76         4820 my $fh;
561             my $trfilename;
562 76 100       335 if ( $self->_use_tempfile() ) {
563 43 100       367 if ($self->ttl_reached) {
564 10         92 $fh = $self->_current_tempfile_fh;
565 10         82 $trfilename = $self->rfilename;
566             } else {
567 33         120 return $self->_current_tempfile;
568             }
569             } else {
570 33         323 $trfilename = $self->rfilename;
571             }
572              
573 43         82 my $dst;
574 43 50       146 if ($fh) {
575 0         0 $dst = $self->_current_tempfile;
576             } else {
577 43         202 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
578 43         237 $dst = $fh->filename;
579 43         416 $self->_current_tempfile ($dst);
580 43         293 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
  43         216  
581 43 100 66     972 if (defined $rfile && -e $rfile) {
582             # saving on bandwidth. Might need to be configurable
583             # $self->bandwidth_is_cheap?
584 27 50       325 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
585             }
586             }
587 43         9957 my $src = join ("/",
588             $self->remoteroot,
589             $trfilename,
590             );
591 43 50       198 if ($self->verbose) {
592 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
593 0         0 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
594 0         0 my $LFH = $self->_logfilehandle;
595 0         0 printf $LFH
596             (
597             "%-4s %d (1/1/%s) temp %s ... ",
598             $doing,
599             time,
600             $self->interval,
601             $display_dst,
602             );
603             }
604 43         283 my $gaveup = 0;
605 43         71 my $retried = 0;
606 43         580 local($ENV{LANG}) = "C";
607 43         263 while (!$self->rsync->exec(
608             src => $src,
609             dst => $dst,
610             )) {
611 0         0 $self->register_rsync_error ($self->rsync->err);
612 0 0       0 if (++$retried >= 3) {
613 0         0 warn "XXX giving up";
614 0         0 $gaveup = 1;
615 0         0 last;
616             }
617             }
618 43 50       2052855 if ($gaveup) {
619 0         0 my $LFH = $self->_logfilehandle;
620 0         0 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
621             } else {
622 43         566 $self->_refresh_internals ($dst);
623 43         660 $self->have_mirrored (Time::HiRes::time);
624 43         366 $self->un_register_rsync_error ();
625             }
626 43         208 $self->unseed;
627 43 50       188 if ($self->verbose) {
628 0         0 my $LFH = $self->_logfilehandle;
629 0         0 print $LFH "DONE\n";
630             }
631 43         272 my $mode = 0644;
632 43 50       1697 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
633 43         789 return $dst;
634             }
635              
636             sub _verified_tempdir {
637 43     43   59 my($self) = @_;
638 43         169 my $tempdir = $self->__verified_tempdir();
639 43 100       324 return $tempdir if defined $tempdir;
640 22 50       85 unless ($tempdir = $self->tempdir) {
641 22         166 $tempdir = $self->localroot;
642             }
643 22 50       331 unless (-d $tempdir) {
644 0         0 mkpath $tempdir;
645             }
646 22         109 $self->__verified_tempdir($tempdir);
647 22         122 return $tempdir;
648             }
649              
650             sub _get_remote_rat_provide_tempfile_object {
651 43     43   120 my($self, $trfilename) = @_;
652 43         151 my $_verified_tempdir = $self->_verified_tempdir;
653 43         231 my $fh = File::Temp->new
654             (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
655             $trfilename,
656             ),
657             DIR => $_verified_tempdir,
658             SUFFIX => $self->serializer_suffix,
659             UNLINK => $self->_use_tempfile,
660             );
661 43         23244 my $mode = 0644;
662 43         346 my $dst = $fh->filename;
663 43 50       1056 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
664 43 100       171 if ($self->_use_tempfile) {
665 10         164 $self->_current_tempfile_fh ($fh); # delay self destruction
666             }
667 43         329 return $fh;
668             }
669              
670             sub _logfilehandle {
671 0     0   0 my($self) = @_;
672 0         0 my $fh;
673 0 0       0 if (my $vl = $self->verboselog) {
674 0 0       0 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
675             } else {
676 0         0 $fh = \*STDERR;
677             }
678 0         0 return $fh;
679             }
680              
681             =head2 $localpath = $obj->get_remotefile ( $relative_path )
682              
683             Rsyncs one single remote file to local filesystem.
684              
685             Note: no locking is done on this file. Any number of processes may
686             mirror this object.
687              
688             Note II: do not use for recentfiles. If you are a cascading
689             slave/server combination, it would confuse other slaves. They would
690             expect the contents of these recentfiles to be available. Use
691             get_remote_recentfile_as_tempfile() instead.
692              
693             =cut
694              
695             sub get_remotefile {
696 0     0 1 0 my($self, $path) = @_;
697 0         0 my $dst = File::Spec->catfile($self->localroot, $path);
698 0         0 mkpath dirname $dst;
699 0 0       0 if ($self->verbose) {
700 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
701 0         0 my $LFH = $self->_logfilehandle;
702 0         0 printf $LFH
703             (
704             "%-4s %d (1/1/%s) %s ... ",
705             $doing,
706             time,
707             $self->interval,
708             $path,
709             );
710             }
711 0         0 local($ENV{LANG}) = "C";
712 0 0       0 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
713 0         0 while (!$self->rsync->exec(
714             src => join("/",
715             $remoteroot,
716             $path),
717             dst => $dst,
718             )) {
719 0         0 $self->register_rsync_error ($self->rsync->err);
720             }
721 0         0 $self->un_register_rsync_error ();
722 0 0       0 if ($self->verbose) {
723 0         0 my $LFH = $self->_logfilehandle;
724 0         0 print $LFH "DONE\n";
725             }
726 0         0 return $dst;
727             }
728              
729             =head2 $obj->interval ( $interval_spec )
730              
731             Get/set accessor. $interval_spec is a string and described below in
732             the section INTERVAL SPEC.
733              
734             =cut
735              
736             sub interval {
737 86432     86432 1 100916 my ($self, $interval) = @_;
738 86432 100       134217 if (@_ >= 2) {
739 6057         13332 $self->_interval($interval);
740 6057         23438 $self->_rfile(undef);
741             }
742 86432         141781 $interval = $self->_interval;
743 86432 100       274006 unless (defined $interval) {
744             # do not ask the $self too much, it recurses!
745 1         5 require Carp;
746 1         167 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
747             }
748 86431         188252 return $interval;
749             }
750              
751             =head2 $secs = $obj->interval_secs ( $interval_spec )
752              
753             $interval_spec is described below in the section INTERVAL SPEC. If
754             empty defaults to the inherent interval for this object.
755              
756             =cut
757              
758             sub interval_secs {
759 31053     31053 1 170701 my ($self, $interval) = @_;
760 31053   66     60661 $interval ||= $self->interval;
761 31052 50       44241 unless (defined $interval) {
762 0         0 die "interval_secs() called without argument on an object without a declared one";
763             }
764 31052 100       122412 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
765             die "Could not determine seconds from interval[$interval]";
766 31051 100 33     137309 if ($interval eq "Z") {
    50          
767 1144         3267 return MAX_INT;
768             } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
769 29907         87055 return $seconds{$t}*$n;
770             } else {
771 0         0 die "Invalid interval specification: n[$n]t[$t]";
772             }
773             }
774              
775             =head2 $obj->localroot ( $localroot )
776              
777             Get/set accessor. The local root of the tree. Guaranteed without
778             trailing slash.
779              
780             =cut
781              
782             sub localroot {
783 13430     13430 1 14316 my ($self, $localroot) = @_;
784 13430 100       23941 if (@_ >= 2) {
785 1987         7109 $localroot =~ s|/$||;
786 1987         5353 $self->_localroot($localroot);
787 1987         8398 $self->_rfile(undef);
788             }
789 13430         29076 $localroot = $self->_localroot;
790             }
791              
792             =head2 $ret = $obj->local_path($path_found_in_recentfile)
793              
794             Combines the path to our local mirror and the path of an object found
795             in this I. In other words: the target of a mirror operation.
796              
797             Implementation note: We split on slashes and then use
798             File::Spec::catfile to adjust to the local operating system.
799              
800             =cut
801              
802             sub local_path {
803 1396     1396 1 1650 my($self,$path) = @_;
804 1396 50       1891 unless (defined $path) {
805             # seems like a degenerated case
806 0         0 return $self->localroot;
807             }
808 1396         3294 my @p = split m|/|, $path;
809 1396         1833 File::Spec->catfile($self->localroot,@p);
810             }
811              
812             =head2 (void) $obj->lock
813              
814             Locking is implemented with an C on a locking directory
815             (C<.lock> appended to $rfile).
816              
817             =cut
818              
819             sub lock {
820 3697     3697 1 4352 my ($self) = @_;
821             # not using flock because it locks on filehandles instead of
822             # old school ressources.
823 3697 50       7797 my $locked = $self->_is_locked and return;
824 3697         17166 my $rfile = $self->rfile;
825             # XXX need a way to allow breaking the lock
826 3697         4786 my $start = time;
827 3697   50     7864 my $locktimeout = $self->locktimeout || 600;
828 3697         17116 my %have_warned;
829 3697         6655 my $lockdir = "$rfile.lock";
830 3697         4405 my $procfile = "$lockdir/process";
831 3697         195805 GETLOCK: while (not mkdir $lockdir) {
832 0 0       0 if (open my $fh, "<", $procfile) {
833 0         0 chomp(my $process = <$fh>);
834 0 0       0 if (0) {
    0          
    0          
835 0         0 } elsif ($process !~ /^\d+$/) {
836 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
837             } elsif ($$ == $process) {
838 0         0 last GETLOCK;
839             } elsif (kill 0, $process) {
840 0 0       0 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
841             } else {
842 0         0 warn "Warning: breaking lock held by process $process";
843 0         0 sleep 1;
844 0         0 last GETLOCK;
845             }
846             } else {
847 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
848             }
849 0         0 Time::HiRes::sleep 0.01;
850 0 0       0 if (time - $start > $locktimeout) {
851 0         0 die "Could not acquire lockdirectory '$rfile.lock': $!";
852             }
853             } # GETLOCK
854 3697 50       182856 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
855 3697         34731 print $fh $$, "\n";
856 3697 50       85037 close $fh or die "Could not close: $!";
857 3697         12507 $self->_is_locked (1);
858             }
859              
860             =head2 (void) $obj->merge ($other)
861              
862             Bulk update of this object with another one. It's used to merge a
863             smaller and younger $other object into the current one. If this file
864             is a C file, then we normally do not merge in objects of type
865             C; this can be overridden by setting
866             keep_delete_objects_forever. But if we encounter an object of type
867             delete we delete the corresponding C object if we have it.
868              
869             If there is nothing to be merged, nothing is done.
870              
871             =cut
872              
873             sub merge {
874 862     862 1 1346 my($self, $other) = @_;
875 862         2112 $self->_merge_sanitycheck ( $other );
876 862         1864 $other->lock;
877 862   50     6392 my $other_recent = $other->recent_events || [];
878 862         2000 $self->lock;
879 862         6654 $self->_merge_locked ( $other, $other_recent );
880 862         8932 $self->unlock;
881 862         4222 $other->unlock;
882             }
883              
884             sub _merge_locked {
885 862     862   1574 my($self, $other, $other_recent) = @_;
886 862   50     1474 my $my_recent = $self->recent_events || [];
887              
888             # calculate the target time span
889 862 100       2044 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
890 862 50       2032 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
891 862         922 my $oldest_allowed = 0;
892 862         662 my $something_done;
893 862 100       1624 unless ($my_recent->[0]) {
894             # obstetrics
895 90         96 $something_done = 1;
896             }
897 862 50       1592 if ($epoch) {
898 862 100 50     1730 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
    100 100        
899 278         3438 $oldest_allowed = 0;
900 278         322 $something_done = 1;
901             } elsif (my $merged = $self->merged) {
902 534         1164 my $secs = $self->interval_secs();
903 534   50     3490 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
904 534 50 33     2610 if (@$other_recent and
905             _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
906             ) {
907 0         0 $oldest_allowed = $other_recent->[-1]{epoch};
908             }
909             }
910 862   100     3664 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
911 2020         2334 pop @$my_recent;
912 2020         6386 $something_done = 1;
913             }
914             }
915              
916 862         1482 my %have_path;
917 862         1494 my $other_recent_filtered = [];
918 862         1914 for my $oev (@$other_recent) {
919 29296   50     44918 my $oevepoch = $oev->{epoch} || 0;
920 29296 50       39956 next if _bigfloatlt($oevepoch, $oldest_allowed);
921 29296         36154 my $path = $oev->{path};
922 29296 50       62530 next if $have_path{$path}++;
923 29296 100 100     37148 if ( $self->interval eq "Z"
      66        
924             and $oev->{type} eq "delete"
925             and ! $self->keep_delete_objects_forever
926             ) {
927             # do nothing
928             } else {
929 29272 100 100     59884 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
930 5574         4774 $something_done = 1;
931             }
932 29272         100518 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
933             }
934             }
935 862 100       2790 if ($something_done) {
936 812         2240 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
937             }
938             }
939              
940             sub _merge_something_done {
941 812     812   1350 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
942 812         1246 my $recent = [];
943 812         1028 my $epoch_conflict = 0;
944 812         708 my $last_epoch;
945 812   100     2392 ZIP: while (@$other_recent_filtered || @$my_recent) {
946 67072         42300 my $event;
947 67072 100 100     177844 if (!@$my_recent ||
      66        
948             @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
949 28370         25320 $event = shift @$other_recent_filtered;
950             } else {
951 38702         30674 $event = shift @$my_recent;
952 38702 100       122324 next ZIP if $have_path->{$event->{path}}++;
953             }
954 44380 100 100     122718 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
955 44380         33154 $last_epoch = $event->{epoch};
956 44380         96592 push @$recent, $event;
957             }
958 812 100       1424 if ($epoch_conflict) {
959 12         18 my %have_epoch;
960 12         42 for (my $i = $#$recent;$i>=0;$i--) {
961 324         228 my $epoch = $recent->[$i]{epoch};
962 324 100       786 if ($have_epoch{$epoch}++) {
963 12         36 while ($have_epoch{$epoch}) {
964 12         42 $epoch = _increase_a_bit($epoch);
965             }
966 12         24 $recent->[$i]{epoch} = $epoch;
967 12         36 $have_epoch{$epoch}++;
968             }
969             }
970             }
971 812 100 100     2530 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
972 278         3852 $self->dirtymark ( $other->dirtymark );
973             }
974 812         8580 $self->write_recent($recent);
975             $other->merged({
976             time => Time::HiRes::time, # not used anywhere
977             epoch => $recent->[0]{epoch},
978 812         4812 into_interval => $self->interval, # not used anywhere
979             });
980 812         1848 $other->write_recent($other_recent);
981             }
982              
983             sub _merge_sanitycheck {
984 862     862   1018 my($self, $other) = @_;
985 862 50       1624 if ($self->interval_secs <= $other->interval_secs) {
986 0         0 require Carp;
987 0         0 Carp::confess
988             (sprintf
989             (
990             "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
991             $self->interval_secs,
992             $other->interval_secs,
993             ));
994             }
995             }
996              
997             =head2 merged
998              
999             Hashref denoting when this recentfile has been merged into some other
1000             at which epoch.
1001              
1002             =cut
1003              
1004             sub merged {
1005 17358     17358 1 23722 my($self, $set) = @_;
1006 17358 100       26410 if (defined $set) {
1007 5438         11177 $self->_merged ($set);
1008             }
1009 17358         42797 my $merged = $self->_merged;
1010 17358         40118 my $into;
1011 17358 100 100     71477 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
      100        
1012             # sanity checks
1013 11189 50       51560 if ($into eq $self->interval) {
    50          
1014 0         0 require Carp;
1015 0         0 Carp::cluck(sprintf
1016             (
1017             "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1018             $into,
1019             $self->interval,
1020             ));
1021             } elsif ($self->interval_secs($into) < $self->interval_secs) {
1022 0         0 require Carp;
1023 0         0 Carp::cluck(sprintf
1024             (
1025             "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1026             $self->interval_secs($into),
1027             $self->interval_secs,
1028             $self->interval,
1029             ));
1030             }
1031             }
1032 17358         34710 $merged;
1033             }
1034              
1035             =head2 $hashref = $obj->meta_data
1036              
1037             Returns the hashref of metadata that the server has to add to the
1038             I.
1039              
1040             =cut
1041              
1042             sub meta_data {
1043 3241     3241 1 4284 my($self) = @_;
1044 3241         4477 my $ret = $self->{meta};
1045 3241         5166 for my $m (
1046             "aggregator",
1047             "canonize",
1048             "comment",
1049             "dirtymark",
1050             "filenameroot",
1051             "interval",
1052             "merged",
1053             "minmax",
1054             "protocol",
1055             "serializer_suffix",
1056             ) {
1057 32410         51249 my $v = $self->$m;
1058 32410 100       78131 if (defined $v) {
1059 27863         45420 $ret->{$m} = $v;
1060             }
1061             }
1062             # XXX need to reset the Producer if I am a writer, keep it when I
1063             # am a reader
1064             $ret->{Producers} ||= {
1065 3241   50     39404 __PACKAGE__, "$VERSION", # stringified it looks better
1066             '$0', $0,
1067             'time', Time::HiRes::time,
1068             };
1069 3241   66     7301 $ret->{dirtymark} ||= Time::HiRes::time;
1070 3241         7958 return $ret;
1071             }
1072              
1073             =head2 $success = $obj->mirror ( %options )
1074              
1075             Mirrors the files in this I as reported by
1076             C. Options named C, C, C are passed
1077             through to the C call. The boolean option C,
1078             if true, causes C to only rsync C
1079             and keep track of the rsynced files so that future calls will rsync
1080             different files until all files are brought to sync.
1081              
1082             =cut
1083              
1084             sub mirror {
1085 32     32 1 5094 my($self, %options) = @_;
1086 32         158 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1087 32         3254 $self->_use_tempfile (1);
1088             # skip-deletes is inadequat for passthrough within mirror. We
1089             # would never reach uptodateness when a delete were on a
1090             # borderline
1091 32         176 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
  96         497  
1092 32         247 my ($recent_events) = $self->recent_events(%passthrough);
1093 32         62 my(@error, @dlcollector); # download-collector: array containing paths we need
1094 32         67 my $first_item = 0;
1095 32         59 my $last_item = $#$recent_events;
1096 32         187 my $done = $self->done;
1097 32         138 my $pathdb = $self->_pathdb;
1098 32         222 ITEM: for my $i ($first_item..$last_item) {
1099 2464         2172 my $status = +{};
1100 2464         4252 $self->_mirror_item
1101             (
1102             $i,
1103             $recent_events,
1104             $last_item,
1105             $done,
1106             $pathdb,
1107             \@dlcollector,
1108             \%options,
1109             $status,
1110             \@error,
1111             );
1112 2464 100       4429 last if $i == $last_item;
1113 2435 100       5050 if ($status->{mustreturn}){
1114 3 100 66     56 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1115             # looks like a bug somewhere else
1116 2         69 my $t = $self->_current_tempfile;
1117 2 50       191 unlink $t or die "Could not unlink '$t': $!";
1118 2         11 $self->_current_tempfile(undef);
1119 2         24 $self->_use_tempfile(0);
1120             }
1121 3         1239 return;
1122             }
1123             }
1124 29 100       130 if (@dlcollector) {
1125 18         44 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
  18         133  
1126 18 50 33     211 if (!$success || $@) {
1127 0         0 warn "Warning: Unknown error while mirroring: $@";
1128 0         0 push @error, $@;
1129 0         0 sleep 1;
1130             }
1131             }
1132 29 50       209 if ($self->verbose) {
1133 0         0 my $LFH = $self->_logfilehandle;
1134 0         0 print $LFH "DONE\n";
1135             }
1136             # once we've gone to the end we consider ourselves free of obligations
1137 29         356 $self->unseed;
1138 29         204 $self->_mirror_unhide_tempfile ($trecentfile);
1139 29         279 $self->_mirror_perform_delayed_ops(\%options);
1140 29         4750 return !@error;
1141             }
1142              
1143             sub _mirror_item {
1144 2464     2464   2895 my($self,
1145             $i,
1146             $recent_events,
1147             $last_item,
1148             $done,
1149             $pathdb,
1150             $dlcollector,
1151             $options,
1152             $status,
1153             $error,
1154             ) = @_;
1155 2464         2291 my $recent_event = $recent_events->[$i];
1156 2464 100       5610 return if $done->covered ( $recent_event->{epoch} );
1157 1396 100       2148 if ($pathdb) {
1158 592         1006 my $rec = $pathdb->{$recent_event->{path}};
1159 592 50 66     937 if ($rec && $rec->{recentepoch}) {
1160 40 50       98 if (_bigfloatgt
1161             ( $rec->{recentepoch}, $recent_event->{epoch} )){
1162 0         0 $done->register ($recent_events, [$i]);
1163 0         0 return;
1164             }
1165             }
1166             }
1167 1396         2249 my $dst = $self->local_path($recent_event->{path});
1168 1396 100       12932 if ($recent_event->{type} eq "new"){
    50          
1169 1372         2174 $self->_mirror_item_new
1170             (
1171             $dst,
1172             $i,
1173             $last_item,
1174             $recent_events,
1175             $recent_event,
1176             $dlcollector,
1177             $pathdb,
1178             $status,
1179             $error,
1180             $options,
1181             );
1182             } elsif ($recent_event->{type} eq "delete") {
1183 24         35 my $activity;
1184 24 50       61 if ($options->{'skip-deletes'}) {
1185 0         0 $activity = "skipped";
1186             } else {
1187 24         653 my @lstat = lstat $dst;
1188 24 100 33     340 if (! -e _) {
    50          
1189 6         37 $activity = "not_found";
1190             } elsif (-l _ or not -d _) {
1191 18         60 $self->delayed_operations->{unlink}{$dst}++;
1192 18         42 $activity = "deleted";
1193             } else {
1194 0         0 $self->delayed_operations->{rmdir}{$dst}++;
1195 0         0 $activity = "deleted";
1196             }
1197             }
1198 24         105 $done->register ($recent_events, [$i]);
1199 24 100       91 if ($pathdb) {
1200 6         114 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1201             }
1202             } else {
1203 0         0 warn "Warning: invalid upload type '$recent_event->{type}'";
1204             }
1205             }
1206              
1207             sub _mirror_item_new {
1208 1372     1372   1855 my($self,
1209             $dst,
1210             $i,
1211             $last_item,
1212             $recent_events,
1213             $recent_event,
1214             $dlcollector,
1215             $pathdb,
1216             $status,
1217             $error,
1218             $options,
1219             ) = @_;
1220 1372 50       1898 if ($self->verbose) {
1221 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
1222 0         0 my $LFH = $self->_logfilehandle;
1223             printf $LFH
1224             (
1225             "%-4s %d (%d/%d/%s) %s ... ",
1226             $doing,
1227             time,
1228             1+$i,
1229             1+$last_item,
1230             $self->interval,
1231             $recent_event->{path},
1232 0         0 );
1233             }
1234 1372   50     4852 my $max_files_per_connection = $self->max_files_per_connection || 42;
1235 1372         3555 my $success;
1236 1372 50       1706 if ($self->verbose) {
1237 0         0 my $LFH = $self->_logfilehandle;
1238 0         0 print $LFH "\n";
1239             }
1240 1372         5882 push @$dlcollector, { rev => $recent_event, i => $i };
1241 1372 100       2106 if (@$dlcollector >= $max_files_per_connection) {
1242 15         31 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
  15         78  
1243 15         107 my $sleep = $self->sleep_per_connection;
1244 15 50       129 $sleep = 0.42 unless defined $sleep;
1245 15         6301656 Time::HiRes::sleep $sleep;
1246 15 100       223 if ($options->{piecemeal}) {
1247 3         40 $status->{mustreturn} = 1;
1248 3         34 return;
1249             }
1250             } else {
1251 1357         2192 return;
1252             }
1253 12 50 33     204 if (!$success || $@) {
1254 0         0 warn "Warning: Error while mirroring: $@";
1255 0         0 push @$error, $@;
1256 0         0 sleep 1;
1257             }
1258 12 50       180 if ($self->verbose) {
1259 0         0 my $LFH = $self->_logfilehandle;
1260 0         0 print $LFH "DONE\n";
1261             }
1262             }
1263              
1264             sub _mirror_dlcollector {
1265 33     33   64 my($self,$xcoll,$pathdb,$recent_events) = @_;
1266 33         80 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
  1372         1915  
1267 33 100       2500 if ($pathdb) {
1268 15         104 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
  586         1226  
1269             }
1270 33         407 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
  1372         2481  
1271 33         1733 @$xcoll = ();
1272 33         188 return $success;
1273             }
1274              
1275             sub _mirror_register_path {
1276 21     21   105 my($self,$pathdb,$coll,$activity) = @_;
1277 21         61 my $time = time;
1278 21         88 for my $item (@$coll) {
1279             $pathdb->{$item->{path}} =
1280             {
1281             recentepoch => $item->{epoch},
1282 592         4004 ($activity."_on") => $time,
1283             };
1284             }
1285             }
1286              
1287             sub _mirror_unhide_tempfile {
1288 29     29   111 my($self, $trecentfile) = @_;
1289 29         149 my $rfile = $self->rfile;
1290 29 50       3229 if (rename $trecentfile, $rfile) {
1291             # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1292             } else {
1293 0         0 require Carp;
1294 0         0 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1295             }
1296 29         222 $self->_use_tempfile (0);
1297 29 100       248 if (my $ctfh = $self->_current_tempfile_fh) {
1298 10         313 $ctfh->unlink_on_destroy (0);
1299 10         325 $self->_current_tempfile_fh (undef);
1300             }
1301             }
1302              
1303             sub _mirror_perform_delayed_ops {
1304 29     29   12536 my($self,$options) = @_;
1305 29         152 my $delayed = $self->delayed_operations;
1306 29         79 for my $dst (keys %{$delayed->{unlink}}) {
  29         242  
1307 36 100       7459110 unless (unlink $dst) {
1308 18         156 require Carp;
1309 18 50       114 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
1310             }
1311 36 50       390 if ($self->verbose) {
1312 0         0 my $doing = "Del";
1313 0         0 my $LFH = $self->_logfilehandle;
1314 0         0 printf $LFH
1315             (
1316             "%-4s %d (%s) %s DONE\n",
1317             $doing,
1318             time,
1319             $self->interval,
1320             $dst,
1321             );
1322 0         0 delete $delayed->{unlink}{$dst};
1323             }
1324             }
1325 29         277 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
  0         0  
  29         307  
1326 0 0       0 unless (rmdir $dst) {
1327 0         0 require Carp;
1328 0 0       0 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
1329             }
1330 0 0       0 if ($self->verbose) {
1331 0         0 my $doing = "Del";
1332 0         0 my $LFH = $self->_logfilehandle;
1333 0         0 printf $LFH
1334             (
1335             "%-4s %d (%s) %s DONE\n",
1336             $doing,
1337             time,
1338             $self->interval,
1339             $dst,
1340             );
1341 0         0 delete $delayed->{rmdir}{$dst};
1342             }
1343             }
1344             }
1345              
1346             =head2 $success = $obj->mirror_path ( $arrref | $path )
1347              
1348             If the argument is a scalar it is treated as a path. The remote path
1349             is mirrored into the local copy. $path is the path found in the
1350             I, i.e. it is relative to the root directory of the
1351             mirror.
1352              
1353             If the argument is an array reference then all elements are treated as
1354             a path below the current tree and all are rsynced with a single
1355             command (and a single connection).
1356              
1357             =cut
1358              
1359             sub mirror_path {
1360 33     33 1 86 my($self,$path) = @_;
1361             # XXX simplify the two branches such that $path is treated as
1362             # [$path] maybe even demand the argument as an arrayref to
1363             # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1364             # interface)
1365 33 50 33     315 if (ref $path and ref $path eq "ARRAY") {
1366 33         81 my $dst = $self->localroot;
1367 33         4220 mkpath dirname $dst;
1368 33         179 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1369             lc $self->filenameroot,
1370             ),
1371             TMPDIR => 1,
1372             UNLINK => 0,
1373             );
1374 33         21435 for my $p (@$path) {
1375 1372         4476 print $fh $p, "\n";
1376             }
1377 33         1741 $fh->flush;
1378 33         166 $fh->unlink_on_destroy(1);
1379 33         509 my $gaveup = 0;
1380 33         58 my $retried = 0;
1381 33         459 local($ENV{LANG}) = "C";
1382 33         196 while (!$self->rsync->exec
1383             (
1384             src => join("/",
1385             $self->remoteroot,
1386             ),
1387             dst => $dst,
1388             'files-from' => $fh->filename,
1389             )) {
1390 0         0 my(@err) = $self->rsync->err;
1391 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1392 0 0       0 if ($self->verbose) {
1393 0         0 my $LFH = $self->_logfilehandle;
1394 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1395             }
1396 0         0 return 1;
1397             }
1398 0         0 $self->register_rsync_error (@err);
1399 0 0       0 if (++$retried >= 3) {
1400 0         0 my $batchsize = @$path;
1401 0         0 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1402 0         0 $gaveup = 1;
1403 0         0 last;
1404             }
1405 0         0 sleep 1;
1406             }
1407 33 50       51146875 unless ($gaveup) {
1408 33         426 $self->un_register_rsync_error ();
1409             }
1410             } else {
1411 0         0 my $dst = $self->local_path($path);
1412 0         0 mkpath dirname $dst;
1413 0         0 local($ENV{LANG}) = "C";
1414 0         0 while (!$self->rsync->exec
1415             (
1416             src => join("/",
1417             $self->remoteroot,
1418             $path
1419             ),
1420             dst => $dst,
1421             )) {
1422 0         0 my(@err) = $self->rsync->err;
1423 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1424 0 0       0 if ($self->verbose) {
1425 0         0 my $LFH = $self->_logfilehandle;
1426 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1427             }
1428 0         0 return 1;
1429             }
1430 0         0 $self->register_rsync_error (@err);
1431             }
1432 0         0 $self->un_register_rsync_error ();
1433             }
1434 33         158933 return 1;
1435             }
1436              
1437             sub _my_ignore_link_stat_errors {
1438 0     0   0 my($self) = @_;
1439 0         0 my $x = $self->ignore_link_stat_errors;
1440 0 0       0 $x = 1 unless defined $x;
1441 0         0 return $x;
1442             }
1443              
1444             sub _my_current_rfile {
1445 7920     7920   9860 my($self) = @_;
1446 7920         6941 my $rfile;
1447 7920 100       14236 if ($self->_use_tempfile) {
1448 33         169 $rfile = $self->_current_tempfile;
1449             }
1450 7920 100 66     35728 unless ($rfile && -s $rfile) {
1451 7887         11686 $rfile = $self->rfile;
1452             }
1453 7920         16929 return $rfile;
1454             }
1455              
1456             =head2 $path = $obj->naive_path_normalize ($path)
1457              
1458             Takes an absolute unix style path as argument and canonicalizes it to
1459             a shorter path if possible, removing things like double slashes or
1460             C and removes references to C<../> directories to get a shorter
1461             unambiguos path. This is used to make the code easier that determines
1462             if a file passed to C is indeed below our C.
1463              
1464             =cut
1465              
1466             sub naive_path_normalize {
1467 1549     1549 1 2017 my($self,$path) = @_;
1468 1549         10655 $path =~ s|/+|/|g;
1469 1549         5005 1 while $path =~ s|/[^/]+/\.\./|/|;
1470 1549         2142 $path =~ s|/$||;
1471 1549         2242 $path;
1472             }
1473              
1474             =head2 $ret = $obj->read_recent_1 ( $data )
1475              
1476             Delegate of C on protocol 1
1477              
1478             =cut
1479              
1480             sub read_recent_1 {
1481 7769     7769 1 7459 my($self, $data) = @_;
1482 7769         11400 return $data->{recent};
1483             }
1484              
1485             =head2 $array_ref = $obj->recent_events ( %options )
1486              
1487             Note: the code relies on the resource being written atomically. We
1488             cannot lock because we may have no write access. If the caller has
1489             write access (eg. aggregate() or update()), it has to care for any
1490             necessary locking and it MUST write atomically.
1491              
1492             If C<$options{after}> is specified, only file events after this
1493             timestamp are returned.
1494              
1495             If C<$options{before}> is specified, only file events before this
1496             timestamp are returned.
1497              
1498             If C<$options{max}> is specified only a maximum of this many most
1499             recent events is returned.
1500              
1501             If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1502             will be returned.
1503              
1504             If C<$options{contains}> is specified the value must be a hash
1505             reference containing a query. The query may contain the keys C,
1506             C, and C. Each represents a condition that must be met. If
1507             there is more than one such key, the conditions are ANDed.
1508              
1509             If C<$options{info}> is specified, it must be a hashref. This hashref
1510             will be filled with metadata about the unfiltered recent_events of
1511             this object, in key C there is the first item, in key C
1512             is the last.
1513              
1514             =cut
1515              
1516             sub recent_events {
1517 7901     7901 1 149126 my ($self, %options) = @_;
1518 7901         9600 my $info = $options{info};
1519 7901 100       17085 if ($self->is_slave) {
1520             # XXX seems dubious, might produce tempfiles without removing them?
1521 38         304 $self->get_remote_recentfile_as_tempfile;
1522             }
1523 7901 50       35916 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1524 7901 100       104028 -e $rfile_or_tempfile or return [];
1525 7769         17262 my $suffix = $self->serializer_suffix;
1526 7769         27053 my ($data) = eval {
1527 7769         14004 $self->_try_deserialize
1528             (
1529             $suffix,
1530             $rfile_or_tempfile,
1531             );
1532             };
1533 7769         4866696 my $err = $@;
1534 7769 50 33     40339 if ($err or !$data) {
1535 0         0 return [];
1536             }
1537 7769         6417 my $re;
1538 7769 50       28208 if (reftype $data eq 'ARRAY') { # protocol 0
1539 0         0 $re = $data;
1540             } else {
1541 7769         21087 $re = $self->_recent_events_protocol_x
1542             (
1543             $data,
1544             $rfile_or_tempfile,
1545             );
1546             }
1547 7769 100       14054 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
  38845         79568  
1548 2406         5520 $self->_recent_events_handle_options ($re, \%options);
1549             }
1550              
1551             # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1552             sub _recent_events_handle_options {
1553 2406     2406   2568 my($self, $re, $options) = @_;
1554 2406         2856 my $last_item = $#$re;
1555 2406         2844 my $info = $options->{info};
1556 2406 100       4254 if ($info) {
1557 2400         4056 $info->{first} = $re->[0];
1558 2400         3978 $info->{last} = $re->[-1];
1559             }
1560 2406 100       4146 if (defined $options->{after}) {
1561 6 50       108 if ($re->[0]{epoch} > $options->{after}) {
1562 6 50       174 if (
1563             my $f = first
1564 150     150   270 {$re->[$_]{epoch} <= $options->{after}}
1565             0..$#$re
1566             ) {
1567 6         36 $last_item = $f-1;
1568             }
1569             } else {
1570 0         0 $last_item = -1;
1571             }
1572             }
1573 2406         2310 my $first_item = 0;
1574 2406 100       4104 if (defined $options->{before}) {
1575 2400 100       8652 if ($re->[0]{epoch} > $options->{before}) {
1576 2184 100       17640 if (
1577             my $f = first
1578 175812     175812   219540 {$re->[$_]{epoch} < $options->{before}}
1579             0..$last_item
1580             ) {
1581 624         1320 $first_item = $f;
1582             }
1583             } else {
1584 216         498 $first_item = 0;
1585             }
1586             }
1587 2406 50 66     18102 if (0 != $first_item || -1 != $last_item) {
1588 2406         24738 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1589             }
1590 2406 50       9438 if ($options->{'skip-deletes'}) {
1591 0         0 @$re = grep { $_->{type} ne "delete" } @$re;
  0         0  
1592             }
1593 2406 50       4572 if (my $contopt = $options->{contains}) {
1594 0         0 my $seen_allowed = 0;
1595 0         0 for my $allow (qw(epoch path type)) {
1596 0 0       0 if (exists $contopt->{$allow}) {
1597 0         0 $seen_allowed++;
1598 0         0 my $v = $contopt->{$allow};
1599 0         0 @$re = grep { $_->{$allow} eq $v } @$re;
  0         0  
1600             }
1601             }
1602 0 0       0 if (keys %$contopt > $seen_allowed) {
1603 0         0 require Carp;
1604 0         0 Carp::confess
1605             (sprintf "unknown query: %s", join ", ", %$contopt);
1606             }
1607             }
1608 2406 50 33     4734 if ($options->{max} && @$re > $options->{max}) {
1609 0         0 @$re = splice @$re, 0, $options->{max};
1610             }
1611 2406         15828 $re;
1612             }
1613              
1614             sub _recent_events_protocol_x {
1615 7769     7769   10120 my($self,
1616             $data,
1617             $rfile_or_tempfile,
1618             ) = @_;
1619 7769         34988 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1620             # we may be reading meta for the first time
1621 7769         7499 while (my($k,$v) = each %{$data->{meta}}) {
  81989         315583  
1622 74220 100       98900 if ($k ne lc $k){ # "Producers"
1623 7769         17166 $self->{ORIG}{$k} = $v;
1624 7769         11679 next;
1625             }
1626 66451 100       117625 next if defined $self->$k;
1627 12541         37953 $self->$k($v);
1628             }
1629 7769         17037 my $re = $self->$meth ($data);
1630 7769         5687 my $minmax;
1631 7769 50       112474 if (my @stat = stat $rfile_or_tempfile) {
1632 7769         20627 $minmax = { mtime => $stat[9] };
1633             } else {
1634             # defensive because ABH encountered:
1635              
1636             #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1637             #### Ydr_.yaml ... DONE
1638             #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1639             #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1640             #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1641             #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1642             #### gone already at cpan-pause.pl line 0
1643            
1644 0         0 my $LFH = $self->_logfilehandle;
1645 0         0 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1646             }
1647 7769 50       15964 if (@$re) {
1648 7769         14721 $minmax->{min} = $re->[-1]{epoch};
1649 7769         9948 $minmax->{max} = $re->[0]{epoch};
1650             }
1651 7769         15566 $self->minmax ( $minmax );
1652 7769         32736 return $re;
1653             }
1654              
1655             sub _try_deserialize {
1656 7769     7769   11970 my($self,
1657             $suffix,
1658             $rfile_or_tempfile,
1659             ) = @_;
1660 7769 50       16481 if ($suffix eq ".yaml") {
    0          
1661 7769         39509 require YAML::Syck;
1662 7769         20861 YAML::Syck::LoadFile($rfile_or_tempfile);
1663             } elsif ($HAVE->{"Data::Serializer"}) {
1664             my $serializer = Data::Serializer->new
1665 0         0 ( serializer => $serializers{$suffix} );
1666             my $serialized = do
1667 0         0 {
1668 0 0       0 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1669 0         0 local $/;
1670 0         0 <$fh>;
1671             };
1672 0         0 $serializer->raw_deserialize($serialized);
1673             } else {
1674 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1675             }
1676             }
1677              
1678             sub _refresh_internals {
1679 43     43   492 my($self, $dst) = @_;
1680 43         272 my $class = ref $self;
1681 43         1403 my $rfpeek = $class->new_from_file ($dst);
1682 43         253 for my $acc (qw(
1683             _merged
1684             minmax
1685             )) {
1686 86         737 $self->$acc ( $rfpeek->$acc );
1687             }
1688 43         810 my $old_dirtymark = $self->dirtymark;
1689 43         403 my $new_dirtymark = $rfpeek->dirtymark;
1690 43 100 66     855 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
      100        
1691 5         50 $self->done->reset;
1692 5         27 $self->dirtymark ( $new_dirtymark );
1693 5         35 $self->_uptodateness_ever_reached(0);
1694 5         50 $self->seed;
1695             }
1696             }
1697              
1698             =head2 $ret = $obj->rfilename
1699              
1700             Just the basename of our I, composed from C,
1701             a dash, C, and C. E.g. C
1702              
1703             =cut
1704              
1705             sub rfilename {
1706 8004     8004 1 25279 my($self) = @_;
1707 8004         13178 my $file = sprintf("%s-%s%s",
1708             $self->filenameroot,
1709             $self->interval,
1710             $self->serializer_suffix,
1711             );
1712 8004         97374 return $file;
1713             }
1714              
1715             =head2 $str = $self->remote_dir
1716              
1717             The directory we are mirroring from.
1718              
1719             =cut
1720              
1721             sub remote_dir {
1722 18     18 1 66 my($self, $set) = @_;
1723 18 100       54 if (defined $set) {
1724 6         24 $self->_remote_dir ($set);
1725             }
1726 18         48 my $x = $self->_remote_dir;
1727 18         72 $self->is_slave (1);
1728 18         72 return $x;
1729             }
1730              
1731             =head2 $str = $obj->remoteroot
1732              
1733             =head2 (void) $obj->remoteroot ( $set )
1734              
1735             Get/Set the composed prefix needed when rsyncing from a remote module.
1736             If remote_host, remote_module, and remote_dir are set, it is composed
1737             from these.
1738              
1739             =cut
1740              
1741             sub remoteroot {
1742 92     92 1 339 my($self, $set) = @_;
1743 92 100       446 if (defined $set) {
1744 16         104 $self->_remoteroot($set);
1745             }
1746 92         572 my $remoteroot = $self->_remoteroot;
1747 92 100       782 unless (defined $remoteroot) {
1748 6 50       24 $remoteroot = sprintf
    50          
    50          
1749             (
1750             "%s%s%s",
1751             defined $self->remote_host ? ($self->remote_host."::") : "",
1752             defined $self->remote_module ? ($self->remote_module."/") : "",
1753             defined $self->remote_dir ? $self->remote_dir : "",
1754             );
1755 6         24 $self->_remoteroot($remoteroot);
1756             }
1757 92         470 return $remoteroot;
1758             }
1759              
1760             =head2 (void) $obj->split_rfilename ( $recentfilename )
1761              
1762             Inverse method to C. C<$recentfilename> is a plain filename
1763             of the pattern
1764              
1765             $filenameroot-$interval$serializer_suffix
1766              
1767             e.g.
1768              
1769             RECENT-1M.yaml
1770              
1771             This filename is split into its parts and the parts are fed to the
1772             object itself.
1773              
1774             =cut
1775              
1776             sub split_rfilename {
1777 6     6 1 18 my($self, $rfname) = @_;
1778 6         54 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1779 6 50       96 if (my($f,$i,$s) = $rfname =~ $splitter) {
1780 6         42 $self->filenameroot ($f);
1781 6         66 $self->interval ($i);
1782 6         24 $self->serializer_suffix ($s);
1783             } else {
1784 0         0 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1785             }
1786 6         72 return;
1787             }
1788              
1789             =head2 my $rfile = $obj->rfile
1790              
1791             Returns the full path of the I
1792              
1793             =cut
1794              
1795             sub rfile {
1796 19266     19266 1 17010 my($self) = @_;
1797 19266         28751 my $rfile = $self->_rfile;
1798 19266 100       70968 return $rfile if defined $rfile;
1799 5970         12905 $rfile = File::Spec->catfile
1800             ($self->localroot,
1801             $self->rfilename,
1802             );
1803 5970         14794 $self->_rfile ($rfile);
1804 5970         20658 return $rfile;
1805             }
1806              
1807             =head2 $rsync_obj = $obj->rsync
1808              
1809             The File::Rsync object that this object uses for communicating with an
1810             upstream server.
1811              
1812             =cut
1813              
1814             sub rsync {
1815 76     76 1 154 my($self) = @_;
1816 76         344 my $rsync = $self->_rsync;
1817 76 100       540 unless (defined $rsync) {
1818 29   50     99 my $rsync_options = $self->rsync_options || {};
1819 29 50       289 if ($HAVE->{"File::Rsync"}) {
1820 29         440 $rsync = File::Rsync->new($rsync_options);
1821 29         79874 $self->_rsync($rsync);
1822             } else {
1823 0         0 die "File::Rsync required for rsync operations. Cannot continue";
1824             }
1825             }
1826 76         934 return $rsync;
1827             }
1828              
1829             =head2 (void) $obj->register_rsync_error(@err)
1830              
1831             =head2 (void) $obj->un_register_rsync_error()
1832              
1833             Register_rsync_error is called whenever the File::Rsync object fails
1834             on an exec (say, connection doesn't succeed). It issues a warning and
1835             sleeps for an increasing amount of time. Un_register_rsync_error
1836             resets the error count. See also accessor C.
1837              
1838             =cut
1839              
1840             {
1841             my $no_success_count = 0;
1842             my $no_success_time = 0;
1843             sub register_rsync_error {
1844 0     0 1 0 my($self, @err) = @_;
1845 0         0 chomp @err;
1846 0         0 $no_success_time = time;
1847 0         0 $no_success_count++;
1848 0         0 my $max_rsync_errors = $self->max_rsync_errors;
1849 0 0       0 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1850 0 0 0     0 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1851 0         0 require Carp;
1852 0         0 Carp::confess
1853             (
1854             sprintf
1855             (
1856             "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1857             $self->interval,
1858             join(" ",@err),
1859             $no_success_count,
1860             ));
1861             }
1862 0         0 my $sleep = 12 * $no_success_count;
1863 0 0       0 $sleep = 300 if $sleep > 300;
1864 0         0 require Carp;
1865 0         0 Carp::cluck
1866             (sprintf
1867             (
1868             "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1869             scalar(localtime($no_success_time)),
1870             $self->interval,
1871             join(" ",@err),
1872             $sleep,
1873             ));
1874 0         0 sleep $sleep
1875             }
1876             sub un_register_rsync_error {
1877 76     76 1 273 my($self) = @_;
1878 76         185 $no_success_time = 0;
1879 76         975 $no_success_count = 0;
1880             }
1881             }
1882              
1883             =head2 $clone = $obj->_sparse_clone
1884              
1885             Clones just as much from itself that it does not hurt. Experimental
1886             method.
1887              
1888             Note: what fits better: sparse or shallow? Other suggestions?
1889              
1890             =cut
1891              
1892             sub _sparse_clone {
1893 3864     3864   4151 my($self) = @_;
1894 3864         12290 my $new = bless {}, ref $self;
1895 3864         6826 for my $m (qw(
1896             _interval
1897             _localroot
1898             _remoteroot
1899             _rfile
1900             _use_tempfile
1901             aggregator
1902             filenameroot
1903             ignore_link_stat_errors
1904             is_slave
1905             max_files_per_connection
1906             protocol
1907             rsync_options
1908             serializer_suffix
1909             sleep_per_connection
1910             tempdir
1911             verbose
1912             )) {
1913 61824         205905 my $o = $self->$m;
1914 61824 100       271866 $o = Storable::dclone $o if ref $o;
1915 61824         88079 $new->$m($o);
1916             }
1917 3864         17772 $new;
1918             }
1919              
1920             =head2 $boolean = OBJ->ttl_reached ()
1921              
1922             =cut
1923              
1924             sub ttl_reached {
1925 43     43 1 102 my($self) = @_;
1926 43   100     147 my $have_mirrored = $self->have_mirrored || 0;
1927 43         387 my $now = Time::HiRes::time;
1928 43         148 my $ttl = $self->ttl;
1929 43 50       243 $ttl = 24.2 unless defined $ttl;
1930 43 100       193 if ($now > $have_mirrored + $ttl) {
1931 10         50 return 1;
1932             }
1933 33         109 return 0;
1934             }
1935              
1936             =head2 (void) $obj->unlock()
1937              
1938             Unlocking is implemented with an C on a locking directory
1939             (C<.lock> appended to $rfile).
1940              
1941             =cut
1942              
1943             sub unlock {
1944 9951     9951 1 10064 my($self) = @_;
1945 9951 100       19862 return unless $self->_is_locked;
1946 3697         17228 my $rfile = $self->rfile;
1947 3697 50       487965 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1948 3697 50       535693 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1949 3697         10999 $self->_is_locked (0);
1950             }
1951              
1952             =head2 unseed
1953              
1954             Sets this recentfile in the state of not 'seeded'.
1955              
1956             =cut
1957             sub unseed {
1958 72     72 1 161 my($self) = @_;
1959 72         422 $self->seeded(0);
1960             }
1961              
1962             =head2 $ret = $obj->update ($path, $type)
1963              
1964             =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1965              
1966             =head2 $ret = $obj->update ()
1967              
1968             Enter one file into the local I. $path is the (usually
1969             absolute) path. If the path is outside I tree, then it is
1970             ignored.
1971              
1972             C<$type> is one of C or C.
1973              
1974             Events of type C may set $dirty_epoch. $dirty_epoch is normally
1975             not used and the epoch is calculated by the update() routine itself
1976             based on current time. But if there is the demand to insert a
1977             not-so-current file into the dataset, then the caller sets
1978             $dirty_epoch. This causes the epoch of the registered event to become
1979             $dirty_epoch or -- if the exact value given is already taken -- a tiny
1980             bit more. As compensation the dirtymark of the whole dataset is set to
1981             now or the current epoch, whichever is higher. Note: setting the
1982             dirty_epoch to the future is prohibited as it's very unlikely to be
1983             intended: it definitely might wreak havoc with the index files.
1984              
1985             The new file event is unshifted (or, if dirty_epoch is set, inserted
1986             at the place it belongs to, according to the rule to have a sequence
1987             of strictly decreasing timestamps) to the array of recent_events and
1988             the array is shortened to the length of the timespan allowed. This is
1989             usually the timespan specified by the interval of this recentfile but
1990             as long as this recentfile has not been merged to another one, the
1991             timespan may grow without bounds.
1992              
1993             The third form runs an update without inserting a new file. This may
1994             be desired to truncate a recentfile.
1995              
1996             =cut
1997             sub _epoch_monotonically_increasing {
1998 1935     1935   3116 my($self,$epoch,$recent) = @_;
1999 1935 100       4083 return $epoch unless @$recent; # the first one goes unoffended
2000 1893 100       20478 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
2001 1695         3280 return $epoch;
2002             } else {
2003 198         498 return _increase_a_bit($recent->[0]{epoch});
2004             }
2005             }
2006             sub update {
2007 1973     1973 1 371598 my($self,$path,$type,$dirty_epoch) = @_;
2008 1973 50 66     8165 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2009 1549 50       3298 die "update called without path argument" unless defined $path;
2010 1549 50       2991 die "update called without type argument" unless defined $type;
2011 1549 50       12330 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2012             }
2013 1973         4906 $self->lock;
2014 1973         22786 my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
2015 1973 100       9782 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2016 1973         7012 $self->_assert_symlink;
2017 1973         5582 $self->unlock;
2018             }
2019              
2020             =head2 $obj->batch_update($batch)
2021              
2022             Like update but for many files. $batch is an arrayref containing
2023             hashrefs with the structure
2024              
2025             {
2026             path => $path,
2027             type => $type,
2028             epoch => $epoch,
2029             }
2030              
2031              
2032              
2033             =cut
2034             sub batch_update {
2035 0     0 1 0 my($self,$batch) = @_;
2036 0         0 $self->lock;
2037 0         0 my $ctx = $self->_locked_batch_update($batch);
2038 0 0       0 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2039 0         0 $self->_assert_symlink;
2040 0         0 $self->unlock;
2041             }
2042             sub _locked_batch_update {
2043 1973     1973   3132 my($self,$batch) = @_;
2044 1973         2186 my $something_done = 0;
2045 1973         4551 my $recent = $self->recent_events;
2046 1973 100       5075 unless ($recent->[0]) {
2047             # obstetrics
2048 42         42 $something_done = 1;
2049             }
2050 1973         3627 my %paths_in_recent = map { $_->{path} => undef } @$recent;
  75303         102937  
2051 1973         7122 my $interval = $self->interval;
2052 1973         3980 my $canonmeth = $self->canonize;
2053 1973 100       7625 unless ($canonmeth) {
2054 468         534 $canonmeth = "naive_path_normalize";
2055             }
2056 1973         1949 my $oldest_allowed = 0;
2057 1973         2163 my $setting_new_dirty_mark = 0;
2058 1973         1898 my $console;
2059 1973 50 66     4086 if ($self->verbose && @$batch > 1) {
2060 0         0 eval {require Time::Progress};
  0         0  
2061 0 0       0 warn "dollarat[$@]" if $@;
2062 0         0 $| = 1;
2063 0         0 $console = new Time::Progress;
2064 0         0 $console->attr( min => 1, max => scalar @$batch );
2065 0         0 print "\n";
2066             }
2067 1973         8069 my $i = 0;
2068 1973         1727 my $memo_splicepos;
2069 1973   0     5175 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
  0   0     0  
2070 1973         1747 $i++;
2071 1973 50 33     4368 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2072 1973         5226 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
2073 1973         2681 $something_done = $ctx->{something_done};
2074 1973         1836 $oldest_allowed = $ctx->{oldest_allowed};
2075 1973         1815 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
2076 1973         2452 $recent = $ctx->{recent};
2077 1973         6000 $memo_splicepos = $ctx->{memo_splicepos};
2078             }
2079 1973 50       4283 print "\n" if $console;
2080 1973 100       2810 if ($setting_new_dirty_mark) {
2081 38         60 $oldest_allowed = 0;
2082             }
2083 1973         4303 TRUNCATE: while (@$recent) {
2084 3157 100       7140 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
2085 1184         1110 pop @$recent;
2086 1184         2834 $something_done = 1;
2087             } else {
2088 1973         3479 last TRUNCATE;
2089             }
2090             }
2091 1973         14132 return {something_done=>$something_done,recent=>$recent};
2092             }
2093             sub _update_batch_item {
2094 1973     1973   3554 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2095 1973         2370 my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
  1973         4479  
2096 1973 50 66     6251 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2097 1549         3917 $path = $self->$canonmeth($path);
2098             }
2099             # you must calculate the time after having locked, of course
2100 1973         4459 my $now = Time::HiRes::time;
2101              
2102 1973         1755 my $epoch;
2103 1973 100 66     5078 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
2104 38         80 $epoch = $dirty_epoch;
2105             } else {
2106 1935         4978 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2107             }
2108 1973   50     4043 $recent ||= [];
2109 1973         3867 my $merged = $self->merged;
2110 1973 100 66     7523 if ($merged->{epoch} && !$setting_new_dirty_mark) {
2111 1051         3677 my $virtualnow = _bigfloatmax($now,$epoch);
2112             # for the lower bound I think we need no big math, we calc already
2113 1051         2791 my $secs = $self->interval_secs();
2114 1051         5588 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
2115             } else {
2116             # as long as we are not merged at all, no limits!
2117             }
2118 1973         3616 my $lrd = $self->localroot;
2119 1973 100 66     20601 if (defined $path && $path =~ s|^\Q$lrd\E||) {
2120 1549         3565 $path =~ s|^/||;
2121 1549         1948 my $splicepos;
2122             # remove the older duplicates of this $path, irrespective of $type:
2123 1549 100       2707 if (defined $dirty_epoch) {
2124 38         182 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2125 38         78 $recent = $ctx->{recent};
2126 38         52 $splicepos = $ctx->{splicepos};
2127 38         70 $epoch = $ctx->{epoch};
2128 38         116 my $dirtymark = $self->dirtymark;
2129 38         124 my $new_dm = $now;
2130 38 50       96 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
2131 0         0 $new_dm = $epoch;
2132             }
2133 38         110 $self->dirtymark($new_dm);
2134 38         106 $setting_new_dirty_mark = 1;
2135 38 50 33     182 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
2136 38         102 $self->merged(+{});
2137             }
2138             } else {
2139 1511         2172 $recent = [ grep { $_->{path} ne $path } @$recent ];
  56999         56323  
2140 1511         1938 $splicepos = 0;
2141             }
2142 1549 50       3240 if (defined $splicepos) {
2143 1549         8662 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2144 1549         3143 $paths_in_recent->{$path} = undef;
2145             }
2146 1549         1593 $memo_splicepos = $splicepos;
2147 1549         1555 $something_done = 1;
2148             }
2149             return
2150             {
2151 1973         10112 something_done => $something_done,
2152             oldest_allowed => $oldest_allowed,
2153             setting_new_dirty_mark => $setting_new_dirty_mark,
2154             recent => $recent,
2155             memo_splicepos => $memo_splicepos,
2156             }
2157             }
2158             sub _update_with_dirty_epoch {
2159 38     38   86 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2160 38         52 my $splicepos;
2161 38         84 my $new_recent = [];
2162 38 50       140 if (exists $paths_in_recent->{$path}) {
2163 0         0 my $cancel = 0;
2164 0         0 KNOWN_EVENT: for my $i (0..$#$recent) {
2165 0 0       0 if ($recent->[$i]{path} eq $path) {
2166 0 0       0 if ($recent->[$i]{epoch} eq $epoch) {
2167             # nothing to do
2168 0         0 $cancel = 1;
2169 0         0 last KNOWN_EVENT;
2170             }
2171             } else {
2172 0         0 push @$new_recent, $recent->[$i];
2173             }
2174             }
2175 0 0       0 @$recent = @$new_recent unless $cancel;
2176             }
2177 38 50 33     204 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
    50          
2178 0         0 $splicepos = 0;
2179             } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2180 38         70 $splicepos = @$recent;
2181             } else {
2182 0         0 my $startingpoint;
2183 0 0 0     0 if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2184 0         0 $startingpoint = 0;
2185             } else {
2186 0         0 $startingpoint = $memo_splicepos;
2187             }
2188 0         0 RECENT: for my $i ($startingpoint..$#$recent) {
2189 0         0 my $ev = $recent->[$i];
2190 0 0       0 if ($epoch eq $recent->[$i]{epoch}) {
2191 0 0       0 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2192             }
2193 0 0       0 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2194 0         0 $splicepos = $i;
2195 0         0 last RECENT;
2196             }
2197             }
2198             }
2199             return {
2200 38         174 recent => $recent,
2201             splicepos => $splicepos,
2202             epoch => $epoch,
2203             }
2204             }
2205              
2206             =head2 seed
2207              
2208             Sets this recentfile in the state of 'seeded' which means it has to
2209             re-evaluate its uptodateness.
2210              
2211             =cut
2212             sub seed {
2213 30     30 1 87 my($self) = @_;
2214 30         131 $self->seeded(1);
2215             }
2216              
2217             =head2 seeded
2218              
2219             Tells if the recentfile is in the state 'seeded'.
2220              
2221             =cut
2222             sub seeded {
2223 147     147 1 641 my($self, $set) = @_;
2224 147 100       607 if (defined $set) {
2225 102         487 $self->_seeded ($set);
2226             }
2227 147         733 my $x = $self->_seeded;
2228 147 100       838 unless (defined $x) {
2229 8         29 $x = 0;
2230 8         29 $self->_seeded ($x);
2231             }
2232 147         563 return $x;
2233             }
2234              
2235             =head2 uptodate
2236              
2237             True if this object has mirrored the complete interval covered by the
2238             current recentfile.
2239              
2240             =cut
2241             sub uptodate {
2242 62     62 1 126 my($self) = @_;
2243 62         136 my $uptodate;
2244             my $why;
2245 62 100 66     302 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2246 21         83 $why = "saturated";
2247 21         50 $uptodate = 1;
2248             }
2249             # it's too easy to misconfigure ttl and related timings and then
2250             # never reach uptodateness, so disabled 2009-03-22
2251 62         282 if (0 and not defined $uptodate) {
2252             if ($self->ttl_reached){
2253             $why = "ttl_reached returned true, so we are not uptodate";
2254             $uptodate = 0 ;
2255             }
2256             }
2257 62 100       179 unless (defined $uptodate) {
2258             # look if recentfile has unchanged timestamp
2259 41         144 my $minmax = $self->minmax;
2260 41 100       280 if (exists $minmax->{mtime}) {
2261 18         96 my $rfile = $self->_my_current_rfile;
2262 18         337 my @stat = stat $rfile;
2263 18 50       78 if (@stat) {
2264 18         32 my $mtime = $stat[9];
2265 18 50 33     445 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
      33        
2266 0         0 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2267 0         0 $uptodate = 0;
2268             } else {
2269 18         81 my $covered = $self->done->covered(@$minmax{qw(max min)});
2270 18 50       187 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2271 18         78 $uptodate = $covered;
2272             }
2273             } else {
2274 0         0 require Carp;
2275 0         0 $why = "Could not stat '$rfile': $!";
2276 0         0 Carp::cluck($why);
2277 0         0 $uptodate = 0;
2278             }
2279             }
2280             }
2281 62 100       228 unless (defined $uptodate) {
2282 23         40 $why = "fallthrough, so not uptodate";
2283 23         29 $uptodate = 0;
2284             }
2285 62 100       227 if ($uptodate) {
2286 36         173 $self->_uptodateness_ever_reached(1);
2287             }
2288 62         364 my $remember =
2289             {
2290             uptodate => $uptodate,
2291             why => $why,
2292             };
2293 62         231 $self->_remember_last_uptodate_call($remember);
2294 62         658 return $uptodate;
2295             }
2296              
2297             =head2 $obj->write_recent ($recent_files_arrayref)
2298              
2299             Writes a I based on the current reflection of the current
2300             state of the tree limited by the current interval.
2301              
2302             =cut
2303             sub _resort {
2304 0     0   0 my($self) = @_;
2305 0         0 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
  0         0  
  0         0  
  0         0  
2306 0         0 return;
2307             }
2308             sub write_recent {
2309 3241     3241 1 5701 my ($self,$recent) = @_;
2310 3241 50       4935 die "write_recent called without argument" unless defined $recent;
2311 3241         2623 my $Last_epoch;
2312 3241         7798 SANITYCHECK: for my $i (0..$#$recent) {
2313 131355 50 66     275950 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2314 0         0 require Carp;
2315             Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2316 0         0 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2317             # you may want to:
2318             # $self->_resort($recent);
2319             # last SANITYCHECK;
2320             }
2321 131355         157186 $Last_epoch = $recent->[$i]{epoch};
2322             }
2323 3241         8012 my $minmax = $self->minmax;
2324 3241 100 100     20006 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2325 1999 50 33     8980 $minmax->{max} = @$recent && exists $recent->[0]{epoch} ? $recent->[0]{epoch} : undef;
2326             }
2327 3241 100 100     11178 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2328 626 50 33     2544 $minmax->{min} = @$recent && exists $recent->[-1]{epoch} ? $recent->[-1]{epoch} : undef;
2329             }
2330 3241         7123 $self->minmax($minmax);
2331 3241         12316 my $meth = sprintf "write_%d", $self->protocol;
2332 3241         22214 $self->$meth($recent);
2333             }
2334              
2335             =head2 $obj->write_0 ($recent_files_arrayref)
2336              
2337             Delegate of C on protocol 0
2338              
2339             =cut
2340              
2341             sub write_0 {
2342 0     0 1 0 my ($self,$recent) = @_;
2343 0         0 my $rfile = $self->rfile;
2344 0         0 YAML::Syck::DumpFile("$rfile.new",$recent);
2345 0 0       0 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2346             }
2347              
2348             =head2 $obj->write_1 ($recent_files_arrayref)
2349              
2350             Delegate of C on protocol 1
2351              
2352             =cut
2353              
2354             sub write_1 {
2355 3241     3241 1 4424 my ($self,$recent) = @_;
2356 3241         5592 my $rfile = $self->rfile;
2357 3241         6276 my $suffix = $self->serializer_suffix;
2358 3241         12034 my $data = {
2359             meta => $self->meta_data,
2360             recent => $recent,
2361             };
2362 3241         3478 my $serialized;
2363 3241 100       5382 if ($suffix eq ".yaml") {
    50          
2364 3223         8785 $serialized = YAML::Syck::Dump($data);
2365             } elsif ($HAVE->{"Data::Serializer"}) {
2366             my $serializer = Data::Serializer->new
2367 18         120 ( serializer => $serializers{$suffix} );
2368 18         12894 $serialized = $serializer->raw_serialize($data);
2369             } else {
2370 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2371             }
2372 3241 50       1220909 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2373 3241         66578 print $fh $serialized;
2374 3241 50       82632 close $fh or die "Could not close '$rfile.new': $!";
2375 3241 50       1309540 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2376             }
2377              
2378             BEGIN {
2379 9     9   79491 my $nq = qr/[^"]+/; # non-quotes
2380 9         62 my @pod_lines =
2381 9         41 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
  36         298  
  36         836  
  153         259  
2382              
2383             =head1 SERIALIZERS
2384              
2385             The following suffixes are supported and trigger the use of these
2386             serializers:
2387              
2388             =over 4
2389              
2390             =item C<< ".yaml" => "YAML::Syck" >>
2391              
2392             =item C<< ".json" => "JSON" >>
2393              
2394             =item C<< ".sto" => "Storable" >>
2395              
2396             =item C<< ".dd" => "Data::Dumper" >>
2397              
2398             =back
2399              
2400             =cut
2401              
2402             BEGIN {
2403 9     9   70 my @pod_lines =
2404 9         17 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
  72         2325  
  270         522  
2405              
2406             =head1 INTERVAL SPEC
2407              
2408             An interval spec is a primitive way to express time spans. Normally it
2409             is composed from an integer and a letter.
2410              
2411             As a special case, a string that consists only of the single letter
2412             C, stands for MAX_INT seconds.
2413              
2414             The following letters express the specified number of seconds:
2415              
2416             =over 4
2417              
2418             =item C<< s => 1 >>
2419              
2420             =item C<< m => 60 >>
2421              
2422             =item C<< h => 60*60 >>
2423              
2424             =item C<< d => 60*60*24 >>
2425              
2426             =item C<< W => 60*60*24*7 >>
2427              
2428             =item C<< M => 60*60*24*30 >>
2429              
2430             =item C<< Q => 60*60*24*90 >>
2431              
2432             =item C<< Y => 60*60*24*365.25 >>
2433              
2434             =back
2435              
2436             =cut
2437              
2438             =head1 SEE ALSO
2439              
2440             L,
2441             L,
2442             L
2443              
2444             =head1 BUGS
2445              
2446             Please report any bugs or feature requests through the web interface
2447             at
2448             L.
2449             I will be notified, and then you'll automatically be notified of
2450             progress on your bug as I make changes.
2451              
2452             =head1 KNOWN BUGS
2453              
2454             Memory hungry: it seems all memory is allocated during the initial
2455             rsync where a list of all files is maintained in memory.
2456              
2457             =head1 SUPPORT
2458              
2459             You can find documentation for this module with the perldoc command.
2460              
2461             perldoc File::Rsync::Mirror::Recentfile
2462              
2463             You can also look for information at:
2464              
2465             =over 4
2466              
2467             =item * RT: CPAN's request tracker
2468              
2469             L
2470              
2471             =item * AnnoCPAN: Annotated CPAN documentation
2472              
2473             L
2474              
2475             =item * CPAN Ratings
2476              
2477             L
2478              
2479             =item * Search CPAN
2480              
2481             L
2482              
2483             =back
2484              
2485              
2486             =head1 ACKNOWLEDGEMENTS
2487              
2488             Thanks to RJBS for module-starter.
2489              
2490             =head1 AUTHOR
2491              
2492             Andreas König
2493              
2494             =head1 COPYRIGHT & LICENSE
2495              
2496             Copyright 2008,2009 Andreas König.
2497              
2498             This program is free software; you can redistribute it and/or modify it
2499             under the same terms as Perl itself.
2500              
2501              
2502             =cut
2503              
2504             1; # End of File::Rsync::Mirror::Recentfile
2505              
2506             # Local Variables:
2507             # mode: cperl
2508             # cperl-indent-level: 4
2509             # End: