File Coverage

blib/lib/File/Rsync/Mirror/Recentfile.pm
Criterion Covered Total %
statement 778 1008 77.1
branch 293 484 60.5
condition 109 184 59.2
subroutine 80 87 91.9
pod 39 39 100.0
total 1299 1802 72.0


line stmt bran cond sub pod time code
1             package File::Rsync::Mirror::Recentfile;
2              
3             # use warnings;
4 8     8   92025 use strict;
  8         35  
  8         513  
5              
6             =encoding utf-8
7              
8             =head1 NAME
9              
10             File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
11              
12             =cut
13              
14             my $HAVE = {};
15             for my $package (
16             "Data::Serializer",
17             "File::Rsync"
18             ) {
19             $HAVE->{$package} = eval qq{ require $package; };
20             }
21 8     8   48 use Config;
  8         13  
  8         337  
22 8     8   65 use File::Basename qw(basename dirname fileparse);
  8         19  
  8         661  
23 8     8   744 use File::Copy qw(cp);
  8         14956  
  8         612  
24 8     8   51 use File::Path qw(mkpath);
  8         15  
  8         387  
25 8     8   3326 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
  8         34  
  8         1774  
26 8     8   4803 use File::Temp;
  8         151178  
  8         898  
27 8     8   153 use List::Util qw(first max min);
  8         19  
  8         1001  
28 8     8   58 use Scalar::Util qw(reftype);
  8         14  
  8         382  
29 8     8   4076 use Storable;
  8         27921  
  8         576  
30 8     8   3605 use Time::HiRes qw();
  8         10175  
  8         242  
31 8     8   2963 use YAML::Syck;
  8         15865  
  8         802  
32              
33 8     8   84 use version; our $VERSION = qv('0.0.8');
  8         35  
  8         73  
34              
35 8     8   1244 use constant MAX_INT => ~0>>1; # anything better?
  8         19  
  8         560  
36 8     8   71 use constant DEFAULT_PROTOCOL => 1;
  8         108  
  8         6212  
37              
38             # cf. interval_secs
39             my %seconds;
40              
41             # maybe subclass if this mapping is bad?
42             my %serializers;
43              
44             =head1 SYNOPSIS
45              
46             Writer (of a single file):
47              
48             use File::Rsync::Mirror::Recentfile;
49             my $fr = File::Rsync::Mirror::Recentfile->new
50             (
51             interval => q(6h),
52             filenameroot => "RECENT",
53             comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54             localroot => "/home/ftp/pub/PAUSE/authors/",
55             aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
56             );
57             $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
58              
59             Reader/mirrorer:
60              
61             my $rf = File::Rsync::Mirror::Recentfile->new
62             (
63             filenameroot => "RECENT",
64             interval => q(6h),
65             localroot => "/home/ftp/pub/PAUSE/authors",
66             remote_dir => "",
67             remote_host => "pause.perl.org",
68             remote_module => "authors",
69             rsync_options => {
70             compress => 1,
71             'rsync-path' => '/usr/bin/rsync',
72             links => 1,
73             times => 1,
74             'omit-dir-times' => 1,
75             checksum => 1,
76             },
77             verbose => 1,
78             );
79             $rf->mirror;
80              
81             Aggregator (usually the writer):
82              
83             my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
84             $rf->aggregate;
85              
86             =head1 DESCRIPTION
87              
88             Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89             is always composed of several recentfiles, controlled by the
90             F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91             for a single timeslice.
92              
93             =head1 EXPORT
94              
95             No exports.
96              
97             =head1 CONSTRUCTORS / DESTRUCTOR
98              
99             =head2 my $obj = CLASS->new(%hash)
100              
101             Constructor. On every argument pair the key is a method name and the
102             value is an argument to that method name.
103              
104             If a recentfile for this resource already exists, metadata that are
105             not defined by the constructor will be fetched from there as soon as
106             it is being read by recent_events().
107              
108             =cut
109              
110             sub new {
111 542     542 1 1212156 my($class, @args) = @_;
112 542         3485 my $self = bless {}, $class;
113 542         4128 while (@args) {
114 1700         11260 my($method,$arg) = splice @args, 0, 2;
115 1700         11705 $self->$method($arg);
116             }
117 542 50       8300 unless (defined $self->protocol) {
118 542         5137 $self->protocol(DEFAULT_PROTOCOL);
119             }
120 542 100       7382 unless (defined $self->filenameroot) {
121 517         6234 $self->filenameroot("RECENT");
122             }
123 542 100       9376 unless (defined $self->serializer_suffix) {
124 522         5259 $self->serializer_suffix(".yaml");
125             }
126 542         5654 return $self;
127             }
128              
129             =head2 my $obj = CLASS->new_from_file($file)
130              
131             Constructor. $file is a I.
132              
133             =cut
134              
135             sub new_from_file {
136 1109     1109 1 811922 my($class, $file) = @_;
137 1109         5264 my $self = bless {}, $class;
138 1109         7214 $self->_rfile($file);
139             #?# $self->lock;
140 1109 50       13220 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
  1109         69568  
141 1109         8586 local $/;
142 1109         56265 <$fh>;
143             };
144             # XXX: we can skip this step when the metadata are sufficient, but
145             # we cannot parse the file without some magic stuff about
146             # serialized formats
147 1109         18971 while (-l $file) {
148 31         1264 my($name,$path) = fileparse $file;
149 31         301 my $symlink = readlink $file;
150 31 50       197 if ($symlink =~ m|/|) {
151 0         0 die "FIXME: filenames containing '/' not supported, got $symlink";
152             }
153 31         971 $file = File::Spec->catfile ( $path, $symlink );
154             }
155 1109         136355 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 1109         12729 $self->serializer_suffix($suffix);
157 1109         11633 $self->localroot($path);
158 1109 50       10279 die "Could not determine file format from suffix" unless $suffix;
159 1109         2910 my $deserialized;
160 1109 50       5365 if ($suffix eq ".yaml") {
    0          
161 1109         11935 require YAML::Syck;
162 1109         6393 $deserialized = YAML::Syck::LoadFile($file);
163             } elsif ($HAVE->{"Data::Serializer"}) {
164             my $serializer = Data::Serializer->new
165 0         0 ( serializer => $serializers{$suffix} );
166 0         0 $deserialized = $serializer->raw_deserialize($serialized);
167             } else {
168 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
169             }
170 1109         1118578 while (my($k,$v) = each %{$deserialized->{meta}}) {
  13119         110565  
171 12010 100       42913 next if $k ne lc $k; # "Producers"
172 10901         46013 $self->$k($v);
173             }
174 1109 50       5471 unless (defined $self->protocol) {
175 0         0 $self->protocol(DEFAULT_PROTOCOL);
176             }
177 1109         41558 return $self;
178             }
179              
180             =head2 DESTROY
181              
182             A simple unlock.
183              
184             =cut
185             sub DESTROY {
186 5198     5198   101523471 my $self = shift;
187 5198         20709 $self->unlock;
188 5198 100       47990 unless ($self->_current_tempfile_fh) {
189 5194 100       38688 if (my $tempfile = $self->_current_tempfile) {
190 119 100       9381 if (-e $tempfile) {
191             # unlink $tempfile; # may fail in global destruction
192             }
193             }
194             }
195             }
196              
197             =head1 ACCESSORS
198              
199             =cut
200              
201             my @accessors;
202              
203             BEGIN {
204 8     8   73 @accessors = (
205             "_current_tempfile",
206             "_current_tempfile_fh",
207             "_delayed_operations",
208             "_done",
209             "_interval",
210             "_is_locked",
211             "_localroot",
212             "_merged",
213             "_pathdb",
214             "_remember_last_uptodate_call",
215             "_remote_dir",
216             "_remoteroot",
217             "_requires_fsck",
218             "_rfile",
219             "_rsync",
220             "__verified_tempdir",
221             "_seeded",
222             "_uptodateness_ever_reached",
223             "_use_tempfile",
224             );
225              
226 8         307 my @pod_lines =
227 8         36 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
  1152         3001  
228              
229             =over 4
230              
231             =item aggregator
232              
233             A list of interval specs that tell the aggregator which Is
234             are to be produced.
235              
236             =item canonize
237              
238             The name of a method to canonize the path before rsyncing. Only
239             supported value is C. Defaults to that.
240              
241             =item comment
242              
243             A comment about this tree and setup.
244              
245             =item dirtymark
246              
247             A timestamp. The dirtymark is updated whenever an out of band change
248             on the origin server is performed that violates the protocol. Say,
249             they add or remove files in the middle somewhere. Slaves must react
250             with a devaluation of their C structure which then leads to a
251             full re-sync of all files. Implementation note: dirtymark may increase
252             or decrease.
253              
254             =item filenameroot
255              
256             The (prefix of the) filename we use for this I. Defaults to
257             C. The string must not contain a directory separator.
258              
259             =item have_mirrored
260              
261             Timestamp remembering when we mirrored this recentfile the last time.
262             Only relevant for slaves.
263              
264             =item ignore_link_stat_errors
265              
266             If set to true, rsync errors are ignored that complain about link stat
267             errors. These seem to happen only when there are files missing at the
268             origin. In race conditions this can always happen, so it defaults to
269             true.
270              
271             =item is_slave
272              
273             If set to true, this object will fetch a new recentfile from remote
274             when the timespan between the last mirror (see have_mirrored) and now
275             is too large (see C).
276              
277             =item keep_delete_objects_forever
278              
279             The default for delete events is that they are passed through the
280             collection of recentfile objects until they reach the Z file. There
281             they get dropped so that the associated file object ceases to exist at
282             all. By setting C the delete objects are
283             kept forever. This makes the Z file larger but has the advantage that
284             slaves that have interrupted mirroring for a long time still can clean
285             up their copy.
286              
287             =item locktimeout
288              
289             After how many seconds shall we die if we cannot lock a I?
290             Defaults to 600 seconds.
291              
292             =item loopinterval
293              
294             When mirror_loop is called, this accessor can specify how much time
295             every loop shall at least take. If the work of a loop is done before
296             that time has gone, sleeps for the rest of the time. Defaults to
297             arbitrary 42 seconds.
298              
299             =item max_files_per_connection
300              
301             Maximum number of files that are transferred on a single rsync call.
302             Setting it higher means higher performance at the price of holding
303             connections longer and potentially disturbing other users in the pool.
304             Defaults to the arbitrary value 42.
305              
306             =item max_rsync_errors
307              
308             When rsync operations encounter that many errors without any resetting
309             success in between, then we die. Defaults to unlimited. A value of
310             -1 means we run forever ignoring all rsync errors.
311              
312             =item minmax
313              
314             Hashref remembering when we read the recent_events from this file the
315             last time and what the timespan was.
316              
317             =item protocol
318              
319             When the RECENT file format changes, we increment the protocol. We try
320             to support older protocols in later releases.
321              
322             =item remote_host
323              
324             The host we are mirroring from. Leave empty for the local filesystem.
325              
326             =item remote_module
327              
328             Rsync servers have so called modules to separate directory trees from
329             each other. Put here the name of the module under which we are
330             mirroring. Leave empty for local filesystem.
331              
332             =item rsync_options
333              
334             Things like compress, links, times or checksums. Passed in to the
335             File::Rsync object used to run the mirror.
336              
337             =item serializer_suffix
338              
339             Mostly untested accessor. The only well tested format for
340             Is at the moment is YAML. It is used with YAML::Syck via
341             Data::Serializer. But in principle other formats are supported as
342             well. See section SERIALIZERS below.
343              
344             =item sleep_per_connection
345              
346             Sleep that many seconds (floating point OK) after every chunk of rsyncing
347             has finished. Defaults to arbitrary 0.42.
348              
349             =item tempdir
350              
351             Directory to write temporary files to. Must allow rename operations
352             into the tree which usually means it must live on the same partition
353             as the target directory. Defaults to C<< $self->localroot >>.
354              
355             =item ttl
356              
357             Time to live. Number of seconds after which this recentfile must be
358             fetched again from the origin server. Only relevant for slaves.
359             Defaults to arbitrary 24.2 seconds.
360              
361             =item verbose
362              
363             Boolean to turn on a bit verbosity.
364              
365             =item verboselog
366              
367             Path to the logfile to write verbose progress information to. This is
368             a primitive stop gap solution to get simple verbose logging working.
369             Switching to Log4perl or similar is probably the way to go.
370              
371             =back
372              
373             =cut
374              
375 8     8   3047 use accessors @accessors;
  8         7362  
  8         44  
376              
377             =head1 METHODS
378              
379             =head2 (void) $obj->aggregate( %options )
380              
381             Takes all intervals that are collected in the accessor called
382             aggregator. Sorts them by actual length of the interval.
383             Removes those that are shorter than our own interval. Then merges this
384             object into the next larger object. The merging continues upwards
385             as long as the next I is old enough to warrant a merge.
386              
387             If a merge is warranted is decided according to the interval of the
388             previous interval so that larger files are not so often updated as
389             smaller ones. If $options{force} is true, all files get updated.
390              
391             Here is an example to illustrate the behaviour. Given aggregators
392              
393             1h 1d 1W 1M 1Q 1Y Z
394              
395             then
396              
397             1h updates 1d on every call to aggregate()
398             1d updates 1W earliest after 1h
399             1W updates 1M earliest after 1d
400             1M updates 1Q earliest after 1W
401             1Q updates 1Y earliest after 1M
402             1Y updates Z earliest after 1Q
403              
404             Note that all but the smallest recentfile get updated at an arbitrary
405             rate and as such are quite useless on their own.
406              
407             =cut
408              
409             sub aggregate {
410 354     354 1 18105737 my($self, %option) = @_;
411 354         1033 my %seen_interval;
412 2932         7140 my @aggs = sort { $a->{secs} <=> $b->{secs} }
413 1870 50       12124 grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
414 1870         17120 map { { interval => $_, secs => $self->interval_secs($_)} }
415 354 50       2498 $self->interval, @{$self->aggregator || []};
  354         1477  
416 354         5503 $self->update;
417 354         16342 $aggs[0]{object} = $self;
418 354         2011 AGGREGATOR: for my $i (0..$#aggs-1) {
419 986         5342 my $this = $aggs[$i]{object};
420 986         4827 my $next = $this->_sparse_clone;
421 986         6962 $next->interval($aggs[$i+1]{interval});
422 986         3517 my $want_merge = 0;
423 986 100 100     7600 if ($option{force} || $i == 0) {
424 606         1686 $want_merge = 1;
425             } else {
426 380         1545 my $next_rfile = $next->rfile;
427 380 100       7080 if (-e $next_rfile) {
428 320         1860 my $prev = $aggs[$i-1]{object};
429 320         2175 local $^T = time;
430 320         3115 my $next_age = 86400 * -M $next_rfile;
431 320 100       2335 if ($next_age > $prev->interval_secs) {
432 55         195 $want_merge = 1;
433             }
434             } else {
435 60         205 $want_merge = 1;
436             }
437             }
438 986 100       3474 if ($want_merge) {
439 721         3168 $next->merge($this);
440 721         29276 $aggs[$i+1]{object} = $next;
441             } else {
442 265         2910 last AGGREGATOR;
443             }
444             }
445             }
446              
447             # collect file size and mtime for all files of this aggregate
448             sub _debug_aggregate {
449 30     30   20310 my($self) = @_;
450 270         530 my @aggs = sort { $a->{secs} <=> $b->{secs} }
451 180         615 map { { interval => $_, secs => $self->interval_secs($_)} }
452 30 50       145 $self->interval, @{$self->aggregator || []};
  30         100  
453 30         110 my $report = [];
454 30         145 for my $i (0..$#aggs) {
455 180         13065 my $this = Storable::dclone $self;
456 180         835 $this->interval($aggs[$i]{interval});
457 180         435 my $rfile = $this->rfile;
458 180         2300 my @stat = stat $rfile;
459 180         1445 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460             }
461 30         480 $report;
462             }
463              
464             # (void) $self->_assert_symlink()
465             sub _assert_symlink {
466 1646     1646   5859 my($self) = @_;
467 1646         5934 my $recentrecentfile = File::Spec->catfile
468             (
469             $self->localroot,
470             sprintf
471             (
472             "%s.recent",
473             $self->filenameroot
474             )
475             );
476 1646 50       83550 if ($Config{d_symlink} eq "define") {
477 1646         4480 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
478 1646 100       28847 if (-l $recentrecentfile) {
479 1625         13834 my $found_symlink = readlink $recentrecentfile;
480 1625 100       7458 if ($found_symlink eq $self->rfilename) {
481 1610         5225 return;
482             } else {
483 15         30 $howto_create_symlink = 2;
484             }
485             } else {
486 21         65 $howto_create_symlink = 1;
487             }
488 36 100       96 if (1 == $howto_create_symlink) {
489 21 50       72 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
490             } else {
491 15         165 unlink "$recentrecentfile.$$"; # may fail
492 15 50       35 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
493 15 50       450 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494             }
495             } else {
496 0         0 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
497 0         0 unlink "$recentrecentfile.$$"; # may fail
498 0 0       0 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
499 0 0       0 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
500             }
501             }
502              
503             =head2 $hashref = $obj->delayed_operations
504              
505             A hash of hashes containing unlink and rmdir operations which had to
506             wait until the recentfile got unhidden in order to not confuse
507             downstream mirrors (in case we have some).
508              
509             =cut
510              
511             sub delayed_operations {
512 41     41 1 136 my($self) = @_;
513 41         308 my $x = $self->_delayed_operations;
514 41 100       1122 unless (defined $x) {
515 15         186 $x = {
516             unlink => {},
517             rmdir => {},
518             };
519 15         103 $self->_delayed_operations ($x);
520             }
521 41         329 return $x;
522             }
523              
524             =head2 $done = $obj->done
525              
526             C<$done> is a reference to a L
527             object that keeps track of rsync activities. Only needed and used when
528             we are a mirroring slave.
529              
530             =cut
531              
532             sub done {
533 119     119 1 1142 my($self) = @_;
534 119         1241 my $done = $self->_done;
535 119 100       1605 if (!$done) {
536 15         262 require File::Rsync::Mirror::Recentfile::Done;
537 15         443 $done = File::Rsync::Mirror::Recentfile::Done->new();
538 15         126 $done->_rfinterval ($self->interval);
539 15         303 $self->_done ( $done );
540             }
541 119         1247 return $done;
542             }
543              
544             =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545              
546             Stores the remote I locally as a tempfile. The caller is
547             responsible to remove the file after use.
548              
549             Note: if you're intending to act as an rsync server for other slaves,
550             then you must prefer this method to fetch that file with
551             get_remotefile(). Otherwise downstream mirrors would expect you to
552             already have mirrored all the files that are in the I
553             before you have them mirrored.
554              
555             =cut
556              
557             sub get_remote_recentfile_as_tempfile {
558 74     74 1 418 my($self) = @_;
559 74         598 mkpath $self->localroot;
560 74         6070 my $fh;
561             my $trfilename;
562 74 100       478 if ( $self->_use_tempfile() ) {
563 43 100       734 if ($self->ttl_reached) {
564 10         79 $fh = $self->_current_tempfile_fh;
565 10         138 $trfilename = $self->rfilename;
566             } else {
567 33         257 return $self->_current_tempfile;
568             }
569             } else {
570 31         438 $trfilename = $self->rfilename;
571             }
572              
573 41         155 my $dst;
574 41 50       412 if ($fh) {
575 0         0 $dst = $self->_current_tempfile;
576             } else {
577 41         466 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
578 41         309 $dst = $fh->filename;
579 41         826 $self->_current_tempfile ($dst);
580 41         369 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
  41         213  
581 41 100 66     1008 if (defined $rfile && -e $rfile) {
582             # saving on bandwidth. Might need to be configurable
583             # $self->bandwidth_is_cheap?
584 27 50       354 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
585             }
586             }
587 41         419786 my $src = join ("/",
588             $self->remoteroot,
589             $trfilename,
590             );
591 41 50       281 if ($self->verbose) {
592 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
593 0         0 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
594 0         0 my $LFH = $self->_logfilehandle;
595 0         0 printf $LFH
596             (
597             "%-4s %d (1/1/%s) temp %s ... ",
598             $doing,
599             time,
600             $self->interval,
601             $display_dst,
602             );
603             }
604 41         475 my $gaveup = 0;
605 41         126 my $retried = 0;
606 41         893 local($ENV{LANG}) = "C";
607 41         350 while (!$self->rsync->exec(
608             src => $src,
609             dst => $dst,
610             )) {
611 0         0 $self->register_rsync_error ($self->rsync->err);
612 0 0       0 if (++$retried >= 3) {
613 0         0 warn "XXX giving up";
614 0         0 $gaveup = 1;
615 0         0 last;
616             }
617             }
618 41 50       2316539 if ($gaveup) {
619 0         0 my $LFH = $self->_logfilehandle;
620 0         0 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
621             } else {
622 41         648 $self->_refresh_internals ($dst);
623 41         1358 $self->have_mirrored (Time::HiRes::time);
624 41         655 $self->un_register_rsync_error ();
625             }
626 41         285 $self->unseed;
627 41 50       327 if ($self->verbose) {
628 0         0 my $LFH = $self->_logfilehandle;
629 0         0 print $LFH "DONE\n";
630             }
631 41         498 my $mode = 0644;
632 41 50       1928 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
633 41         1181 return $dst;
634             }
635              
636             sub _verified_tempdir {
637 41     41   155 my($self) = @_;
638 41         384 my $tempdir = $self->__verified_tempdir();
639 41 100       524 return $tempdir if defined $tempdir;
640 20 50       125 unless ($tempdir = $self->tempdir) {
641 20         198 $tempdir = $self->localroot;
642             }
643 20 50       511 unless (-d $tempdir) {
644 0         0 mkpath $tempdir;
645             }
646 20         178 $self->__verified_tempdir($tempdir);
647 20         264 return $tempdir;
648             }
649              
650             sub _get_remote_rat_provide_tempfile_object {
651 41     41   292 my($self, $trfilename) = @_;
652 41         353 my $_verified_tempdir = $self->_verified_tempdir;
653 41         626 my $fh = File::Temp->new
654             (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
655             $trfilename,
656             ),
657             DIR => $_verified_tempdir,
658             SUFFIX => $self->serializer_suffix,
659             UNLINK => $self->_use_tempfile,
660             );
661 41         32843 my $mode = 0644;
662 41         276 my $dst = $fh->filename;
663 41 50       1288 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
664 41 100       292 if ($self->_use_tempfile) {
665 10         124 $self->_current_tempfile_fh ($fh); # delay self destruction
666             }
667 41         454 return $fh;
668             }
669              
670             sub _logfilehandle {
671 0     0   0 my($self) = @_;
672 0         0 my $fh;
673 0 0       0 if (my $vl = $self->verboselog) {
674 0 0       0 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
675             } else {
676 0         0 $fh = \*STDERR;
677             }
678 0         0 return $fh;
679             }
680              
681             =head2 $localpath = $obj->get_remotefile ( $relative_path )
682              
683             Rsyncs one single remote file to local filesystem.
684              
685             Note: no locking is done on this file. Any number of processes may
686             mirror this object.
687              
688             Note II: do not use for recentfiles. If you are a cascading
689             slave/server combination, it would confuse other slaves. They would
690             expect the contents of these recentfiles to be available. Use
691             get_remote_recentfile_as_tempfile() instead.
692              
693             =cut
694              
695             sub get_remotefile {
696 0     0 1 0 my($self, $path) = @_;
697 0         0 my $dst = File::Spec->catfile($self->localroot, $path);
698 0         0 mkpath dirname $dst;
699 0 0       0 if ($self->verbose) {
700 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
701 0         0 my $LFH = $self->_logfilehandle;
702 0         0 printf $LFH
703             (
704             "%-4s %d (1/1/%s) %s ... ",
705             $doing,
706             time,
707             $self->interval,
708             $path,
709             );
710             }
711 0         0 local($ENV{LANG}) = "C";
712 0 0       0 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
713 0         0 while (!$self->rsync->exec(
714             src => join("/",
715             $remoteroot,
716             $path),
717             dst => $dst,
718             )) {
719 0         0 $self->register_rsync_error ($self->rsync->err);
720             }
721 0         0 $self->un_register_rsync_error ();
722 0 0       0 if ($self->verbose) {
723 0         0 my $LFH = $self->_logfilehandle;
724 0         0 print $LFH "DONE\n";
725             }
726 0         0 return $dst;
727             }
728              
729             =head2 $obj->interval ( $interval_spec )
730              
731             Get/set accessor. $interval_spec is a string and described below in
732             the section INTERVAL SPEC.
733              
734             =cut
735              
736             sub interval {
737 69899     69899 1 211858 my ($self, $interval) = @_;
738 69899 100       213068 if (@_ >= 2) {
739 5060         23012 $self->_interval($interval);
740 5060         38317 $self->_rfile(undef);
741             }
742 69899         241570 $interval = $self->_interval;
743 69899 100       487308 unless (defined $interval) {
744             # do not ask the $self too much, it recurses!
745 1         5 require Carp;
746 1         198 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
747             }
748 69898         343330 return $interval;
749             }
750              
751             =head2 $secs = $obj->interval_secs ( $interval_spec )
752              
753             $interval_spec is described below in the section INTERVAL SPEC. If
754             empty defaults to the inherent interval for this object.
755              
756             =cut
757              
758             sub interval_secs {
759 26039     26039 1 291301 my ($self, $interval) = @_;
760 26039   66     92047 $interval ||= $self->interval;
761 26038 50       69592 unless (defined $interval) {
762 0         0 die "interval_secs() called without argument on an object without a declared one";
763             }
764 26038 100       204491 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
765             die "Could not determine seconds from interval[$interval]";
766 26037 100 33     221622 if ($interval eq "Z") {
    50          
767 961         5822 return MAX_INT;
768             } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
769 25076         165984 return $seconds{$t}*$n;
770             } else {
771 0         0 die "Invalid interval specification: n[$n]t[$t]";
772             }
773             }
774              
775             =head2 $obj->localroot ( $localroot )
776              
777             Get/set accessor. The local root of the tree. Guaranteed without
778             trailing slash.
779              
780             =cut
781              
782             sub localroot {
783 11492     11492 1 37041 my ($self, $localroot) = @_;
784 11492 100       41645 if (@_ >= 2) {
785 1663         20319 $localroot =~ s|/$||;
786 1663         10350 $self->_localroot($localroot);
787 1663         18602 $self->_rfile(undef);
788             }
789 11492         51233 $localroot = $self->_localroot;
790             }
791              
792             =head2 $ret = $obj->local_path($path_found_in_recentfile)
793              
794             Combines the path to our local mirror and the path of an object found
795             in this I. In other words: the target of a mirror operation.
796              
797             Implementation note: We split on slashes and then use
798             File::Spec::catfile to adjust to the local operating system.
799              
800             =cut
801              
802             sub local_path {
803 1435     1435 1 5452 my($self,$path) = @_;
804 1435 50       3942 unless (defined $path) {
805             # seems like a degenerated case
806 0         0 return $self->localroot;
807             }
808 1435         7124 my @p = split m|/|, $path;
809 1435         7204 File::Spec->catfile($self->localroot,@p);
810             }
811              
812             =head2 (void) $obj->lock
813              
814             Locking is implemented with an C on a locking directory
815             (C<.lock> appended to $rfile).
816              
817             =cut
818              
819             sub lock {
820 3088     3088 1 9444 my ($self) = @_;
821             # not using flock because it locks on filehandles instead of
822             # old school ressources.
823 3088 50       17028 my $locked = $self->_is_locked and return;
824 3088         24924 my $rfile = $self->rfile;
825             # XXX need a way to allow breaking the lock
826 3088         8386 my $start = time;
827 3088   50     13395 my $locktimeout = $self->locktimeout || 600;
828 3088         26868 my %have_warned;
829 3088         8835 my $lockdir = "$rfile.lock";
830 3088         11025 my $procfile = "$lockdir/process";
831 3088         306794 GETLOCK: while (not mkdir $lockdir) {
832 0 0       0 if (open my $fh, "<", $procfile) {
833 0         0 chomp(my $process = <$fh>);
834 0 0       0 if (0) {
    0          
    0          
835 0         0 } elsif ($process !~ /^\d+$/) {
836 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
837             } elsif ($$ == $process) {
838 0         0 last GETLOCK;
839             } elsif (kill 0, $process) {
840 0 0       0 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
841             } else {
842 0         0 warn "Warning: breaking lock held by process $process";
843 0         0 sleep 1;
844 0         0 last GETLOCK;
845             }
846             } else {
847 0 0       0 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
848             }
849 0         0 Time::HiRes::sleep 0.01;
850 0 0       0 if (time - $start > $locktimeout) {
851 0         0 die "Could not acquire lockdirectory '$rfile.lock': $!";
852             }
853             } # GETLOCK
854 3088 50       207026 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
855 3088         56478 print $fh $$, "\n";
856 3088 50       87642 close $fh or die "Could not close: $!";
857 3088         18135 $self->_is_locked (1);
858             }
859              
860             =head2 (void) $obj->merge ($other)
861              
862             Bulk update of this object with another one. It's used to merge a
863             smaller and younger $other object into the current one. If this file
864             is a C file, then we normally do not merge in objects of type
865             C; this can be overridden by setting
866             keep_delete_objects_forever. But if we encounter an object of type
867             delete we delete the corresponding C object if we have it.
868              
869             If there is nothing to be merged, nothing is done.
870              
871             =cut
872              
873             sub merge {
874 721     721 1 2587 my($self, $other) = @_;
875 721         4103 $self->_merge_sanitycheck ( $other );
876 721         3857 $other->lock;
877 721   50     9102 my $other_recent = $other->recent_events || [];
878 721         3428 $self->lock;
879 721         11304 $self->_merge_locked ( $other, $other_recent );
880 721         18588 $self->unlock;
881 721         6703 $other->unlock;
882             }
883              
884             sub _merge_locked {
885 721     721   2428 my($self, $other, $other_recent) = @_;
886 721   50     3726 my $my_recent = $self->recent_events || [];
887              
888             # calculate the target time span
889 721 100       3917 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
890 721 50       3234 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
891 721         1787 my $oldest_allowed = 0;
892 721         1994 my $something_done;
893 721 100       2694 unless ($my_recent->[0]) {
894             # obstetrics
895 75         135 $something_done = 1;
896             }
897 721 50       2426 if ($epoch) {
898 721 100 50     3183 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
    100 100        
899 233         5540 $oldest_allowed = 0;
900 233         448 $something_done = 1;
901             } elsif (my $merged = $self->merged) {
902 446         1850 my $secs = $self->interval_secs();
903 446   50     5733 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
904 446 50 33     4433 if (@$other_recent and
905             _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
906             ) {
907 0         0 $oldest_allowed = $other_recent->[-1]{epoch};
908             }
909             }
910 721   100     5497 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
911 1378         2838 pop @$my_recent;
912 1378         5835 $something_done = 1;
913             }
914             }
915              
916 721         1811 my %have_path;
917 721         3216 my $other_recent_filtered = [];
918 721         2308 for my $oev (@$other_recent) {
919 22079   50     79376 my $oevepoch = $oev->{epoch} || 0;
920 22079 50       70748 next if _bigfloatlt($oevepoch, $oldest_allowed);
921 22079         65013 my $path = $oev->{path};
922 22079 50       100573 next if $have_path{$path}++;
923 22079 100 100     67858 if ( $self->interval eq "Z"
      66        
924             and $oev->{type} eq "delete"
925             and ! $self->keep_delete_objects_forever
926             ) {
927             # do nothing
928             } else {
929 22052 100 100     89313 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
930 4697         8082 $something_done = 1;
931             }
932 22052         168399 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
933             }
934             }
935 721 100       4840 if ($something_done) {
936 679         3688 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
937             }
938             }
939              
940             sub _merge_something_done {
941 679     679   3312 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
942 679         2148 my $recent = [];
943 679         1605 my $epoch_conflict = 0;
944 679         1394 my $last_epoch;
945 679   100     2829 ZIP: while (@$other_recent_filtered || @$my_recent) {
946 49100         106319 my $event;
947 49100 100 100     255288 if (!@$my_recent ||
      100        
948             @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
949 20998         43709 $event = shift @$other_recent_filtered;
950             } else {
951 28102         57015 $event = shift @$my_recent;
952 28102 100       194381 next ZIP if $have_path->{$event->{path}}++;
953             }
954 32857 100 100     194666 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
955 32857         68802 $last_epoch = $event->{epoch};
956 32857         156086 push @$recent, $event;
957             }
958 679 100       2688 if ($epoch_conflict) {
959 10         25 my %have_epoch;
960 10         45 for (my $i = $#$recent;$i>=0;$i--) {
961 270         390 my $epoch = $recent->[$i]{epoch};
962 270 100       810 if ($have_epoch{$epoch}++) {
963 10         40 while ($have_epoch{$epoch}) {
964 10         45 $epoch = _increase_a_bit($epoch);
965             }
966 10         30 $recent->[$i]{epoch} = $epoch;
967 10         40 $have_epoch{$epoch}++;
968             }
969             }
970             }
971 679 100 100     3805 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
972 233         4734 $self->dirtymark ( $other->dirtymark );
973             }
974 679         13840 $self->write_recent($recent);
975             $other->merged({
976             time => Time::HiRes::time, # not used anywhere
977             epoch => $recent->[0]{epoch},
978 679         6433 into_interval => $self->interval, # not used anywhere
979             });
980 679         2969 $other->write_recent($other_recent);
981             }
982              
983             sub _merge_sanitycheck {
984 721     721   2534 my($self, $other) = @_;
985 721 50       2469 if ($self->interval_secs <= $other->interval_secs) {
986 0         0 require Carp;
987 0         0 Carp::confess
988             (sprintf
989             (
990             "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
991             $self->interval_secs,
992             $other->interval_secs,
993             ));
994             }
995             }
996              
997             =head2 merged
998              
999             Hashref denoting when this recentfile has been merged into some other
1000             at which epoch.
1001              
1002             =cut
1003              
1004             sub merged {
1005 14498     14498 1 52304 my($self, $set) = @_;
1006 14498 100       90147 if (defined $set) {
1007 4542         19127 $self->_merged ($set);
1008             }
1009 14498         72307 my $merged = $self->_merged;
1010 14498         83585 my $into;
1011 14498 100 100     88891 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
      100        
1012             # sanity checks
1013 9397 50       80109 if ($into eq $self->interval) {
    50          
1014 0         0 require Carp;
1015 0         0 Carp::cluck(sprintf
1016             (
1017             "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1018             $into,
1019             $self->interval,
1020             ));
1021             } elsif ($self->interval_secs($into) < $self->interval_secs) {
1022 0         0 require Carp;
1023 0         0 Carp::cluck(sprintf
1024             (
1025             "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1026             $self->interval_secs($into),
1027             $self->interval_secs,
1028             $self->interval,
1029             ));
1030             }
1031             }
1032 14498         63321 $merged;
1033             }
1034              
1035             =head2 $hashref = $obj->meta_data
1036              
1037             Returns the hashref of metadata that the server has to add to the
1038             I.
1039              
1040             =cut
1041              
1042             sub meta_data {
1043 2707     2707 1 7298 my($self) = @_;
1044 2707         7119 my $ret = $self->{meta};
1045 2707         9152 for my $m (
1046             "aggregator",
1047             "canonize",
1048             "comment",
1049             "dirtymark",
1050             "filenameroot",
1051             "interval",
1052             "merged",
1053             "minmax",
1054             "protocol",
1055             "serializer_suffix",
1056             ) {
1057 27070         86656 my $v = $self->$m;
1058 27070 100       145095 if (defined $v) {
1059 23272         87090 $ret->{$m} = $v;
1060             }
1061             }
1062             # XXX need to reset the Producer if I am a writer, keep it when I
1063             # am a reader
1064             $ret->{Producers} ||= {
1065 2707   50     116423 __PACKAGE__, "$VERSION", # stringified it looks better
1066             '$0', $0,
1067             'time', Time::HiRes::time,
1068             };
1069 2707   66     13254 $ret->{dirtymark} ||= Time::HiRes::time;
1070 2707         12439 return $ret;
1071             }
1072              
1073             =head2 $success = $obj->mirror ( %options )
1074              
1075             Mirrors the files in this I as reported by
1076             C. Options named C, C, C are passed
1077             through to the C call. The boolean option C,
1078             if true, causes C to only rsync C
1079             and keep track of the rsynced files so that future calls will rsync
1080             different files until all files are brought to sync.
1081              
1082             =cut
1083              
1084             sub mirror {
1085 32     32 1 5530 my($self, %options) = @_;
1086 32         534 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1087 32         4410 $self->_use_tempfile (1);
1088             # skip-deletes is inadequat for passthrough within mirror. We
1089             # would never reach uptodateness when a delete were on a
1090             # borderline
1091 32         294 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
  96         1071  
1092 32         460 my ($recent_events) = $self->recent_events(%passthrough);
1093 32         498 my(@error, @dlcollector); # download-collector: array containing paths we need
1094 32         134 my $first_item = 0;
1095 32         164 my $last_item = $#$recent_events;
1096 32         195 my $done = $self->done;
1097 32         352 my $pathdb = $self->_pathdb;
1098 32         366 ITEM: for my $i ($first_item..$last_item) {
1099 2641         6710 my $status = +{};
1100 2641         12238 $self->_mirror_item
1101             (
1102             $i,
1103             $recent_events,
1104             $last_item,
1105             $done,
1106             $pathdb,
1107             \@dlcollector,
1108             \%options,
1109             $status,
1110             \@error,
1111             );
1112 2641 100       18215 last if $i == $last_item;
1113 2615 100       11256 if ($status->{mustreturn}){
1114 6 100 66     100 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1115             # looks like a bug somewhere else
1116 5         302 my $t = $self->_current_tempfile;
1117 5 50       806 unlink $t or die "Could not unlink '$t': $!";
1118 5         62 $self->_current_tempfile(undef);
1119 5         87 $self->_use_tempfile(0);
1120             }
1121 6         3046 return;
1122             }
1123             }
1124 26 100       213 if (@dlcollector) {
1125 17         89 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
  17         157  
1126 17 50 33     313 if (!$success || $@) {
1127 0         0 warn "Warning: Unknown error while mirroring: $@";
1128 0         0 push @error, $@;
1129 0         0 sleep 1;
1130             }
1131             }
1132 26 50       372 if ($self->verbose) {
1133 0         0 my $LFH = $self->_logfilehandle;
1134 0         0 print $LFH "DONE\n";
1135             }
1136             # once we've gone to the end we consider ourselves free of obligations
1137 26         513 $self->unseed;
1138 26         200 $self->_mirror_unhide_tempfile ($trecentfile);
1139 26         354 $self->_mirror_perform_delayed_ops(\%options);
1140 26         5464 return !@error;
1141             }
1142              
1143             sub _mirror_item {
1144 2641     2641   8138 my($self,
1145             $i,
1146             $recent_events,
1147             $last_item,
1148             $done,
1149             $pathdb,
1150             $dlcollector,
1151             $options,
1152             $status,
1153             $error,
1154             ) = @_;
1155 2641         6518 my $recent_event = $recent_events->[$i];
1156 2641 100       11793 return if $done->covered ( $recent_event->{epoch} );
1157 1435 100       4455 if ($pathdb) {
1158 815         3180 my $rec = $pathdb->{$recent_event->{path}};
1159 815 50 66     3183 if ($rec && $rec->{recentepoch}) {
1160 260 50       1227 if (_bigfloatgt
1161             ( $rec->{recentepoch}, $recent_event->{epoch} )){
1162 0         0 $done->register ($recent_events, [$i]);
1163 0         0 return;
1164             }
1165             }
1166             }
1167 1435         6000 my $dst = $self->local_path($recent_event->{path});
1168 1435 100       34028 if ($recent_event->{type} eq "new"){
    50          
1169 1411         4929 $self->_mirror_item_new
1170             (
1171             $dst,
1172             $i,
1173             $last_item,
1174             $recent_events,
1175             $recent_event,
1176             $dlcollector,
1177             $pathdb,
1178             $status,
1179             $error,
1180             $options,
1181             );
1182             } elsif ($recent_event->{type} eq "delete") {
1183 24         59 my $activity;
1184 24 50       91 if ($options->{'skip-deletes'}) {
1185 0         0 $activity = "skipped";
1186             } else {
1187 24         565 my @lstat = lstat $dst;
1188 24 100 33     272 if (! -e _) {
    50          
1189 9         75 $activity = "not_found";
1190             } elsif (-l _ or not -d _) {
1191 15         100 $self->delayed_operations->{unlink}{$dst}++;
1192 15         70 $activity = "deleted";
1193             } else {
1194 0         0 $self->delayed_operations->{rmdir}{$dst}++;
1195 0         0 $activity = "deleted";
1196             }
1197             }
1198 24         194 $done->register ($recent_events, [$i]);
1199 24 100       154 if ($pathdb) {
1200 9         75 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1201             }
1202             } else {
1203 0         0 warn "Warning: invalid upload type '$recent_event->{type}'";
1204             }
1205             }
1206              
1207             sub _mirror_item_new {
1208 1411     1411   4705 my($self,
1209             $dst,
1210             $i,
1211             $last_item,
1212             $recent_events,
1213             $recent_event,
1214             $dlcollector,
1215             $pathdb,
1216             $status,
1217             $error,
1218             $options,
1219             ) = @_;
1220 1411 50       4536 if ($self->verbose) {
1221 0 0       0 my $doing = -e $dst ? "Sync" : "Get";
1222 0         0 my $LFH = $self->_logfilehandle;
1223             printf $LFH
1224             (
1225             "%-4s %d (%d/%d/%s) %s ... ",
1226             $doing,
1227             time,
1228             1+$i,
1229             1+$last_item,
1230             $self->interval,
1231             $recent_event->{path},
1232 0         0 );
1233             }
1234 1411   50     11052 my $max_files_per_connection = $self->max_files_per_connection || 42;
1235 1411         9344 my $success;
1236 1411 50       4055 if ($self->verbose) {
1237 0         0 my $LFH = $self->_logfilehandle;
1238 0         0 print $LFH "\n";
1239             }
1240 1411         13755 push @$dlcollector, { rev => $recent_event, i => $i };
1241 1411 100       5114 if (@$dlcollector >= $max_files_per_connection) {
1242 11         50 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
  11         109  
1243 11         131 my $sleep = $self->sleep_per_connection;
1244 11 50       140 $sleep = 0.42 unless defined $sleep;
1245 11         4622925 Time::HiRes::sleep $sleep;
1246 11 100       286 if ($options->{piecemeal}) {
1247 6         91 $status->{mustreturn} = 1;
1248 6         109 return;
1249             }
1250             } else {
1251 1400         5361 return;
1252             }
1253 5 50 33     150 if (!$success || $@) {
1254 0         0 warn "Warning: Error while mirroring: $@";
1255 0         0 push @$error, $@;
1256 0         0 sleep 1;
1257             }
1258 5 50       105 if ($self->verbose) {
1259 0         0 my $LFH = $self->_logfilehandle;
1260 0         0 print $LFH "DONE\n";
1261             }
1262             }
1263              
1264             sub _mirror_dlcollector {
1265 28     28   268 my($self,$xcoll,$pathdb,$recent_events) = @_;
1266 28         159 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
  1411         5546  
1267 28 100       4922 if ($pathdb) {
1268 18         236 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
  806         3996  
1269             }
1270 28         453 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
  1411         84941  
1271 28         3019 @$xcoll = ();
1272 28         226 return $success;
1273             }
1274              
1275             sub _mirror_register_path {
1276 27     27   212 my($self,$pathdb,$coll,$activity) = @_;
1277 27         140 my $time = time;
1278 27         201 for my $item (@$coll) {
1279             $pathdb->{$item->{path}} =
1280             {
1281             recentepoch => $item->{epoch},
1282 815         12219 ($activity."_on") => $time,
1283             };
1284             }
1285             }
1286              
1287             sub _mirror_unhide_tempfile {
1288 26     26   213 my($self, $trecentfile) = @_;
1289 26         3457 my $rfile = $self->rfile;
1290 26 50       4059 if (rename $trecentfile, $rfile) {
1291             # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1292             } else {
1293 0         0 require Carp;
1294 0         0 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1295             }
1296 26         206 $self->_use_tempfile (0);
1297 26 100       329 if (my $ctfh = $self->_current_tempfile_fh) {
1298 10         330 $ctfh->unlink_on_destroy (0);
1299 10         297 $self->_current_tempfile_fh (undef);
1300             }
1301             }
1302              
1303             sub _mirror_perform_delayed_ops {
1304 26     26   1341 my($self,$options) = @_;
1305 26         187 my $delayed = $self->delayed_operations;
1306 26         61 for my $dst (keys %{$delayed->{unlink}}) {
  26         642  
1307 30 100       1105 unless (unlink $dst) {
1308 15         120 require Carp;
1309 15 50       105 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
1310             }
1311 30 50       125 if ($self->verbose) {
1312 0         0 my $doing = "Del";
1313 0         0 my $LFH = $self->_logfilehandle;
1314 0         0 printf $LFH
1315             (
1316             "%-4s %d (%s) %s DONE\n",
1317             $doing,
1318             time,
1319             $self->interval,
1320             $dst,
1321             );
1322 0         0 delete $delayed->{unlink}{$dst};
1323             }
1324             }
1325 26         153 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
  0         0  
  26         256  
1326 0 0       0 unless (rmdir $dst) {
1327 0         0 require Carp;
1328 0 0       0 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
1329             }
1330 0 0       0 if ($self->verbose) {
1331 0         0 my $doing = "Del";
1332 0         0 my $LFH = $self->_logfilehandle;
1333 0         0 printf $LFH
1334             (
1335             "%-4s %d (%s) %s DONE\n",
1336             $doing,
1337             time,
1338             $self->interval,
1339             $dst,
1340             );
1341 0         0 delete $delayed->{rmdir}{$dst};
1342             }
1343             }
1344             }
1345              
1346             =head2 $success = $obj->mirror_path ( $arrref | $path )
1347              
1348             If the argument is a scalar it is treated as a path. The remote path
1349             is mirrored into the local copy. $path is the path found in the
1350             I, i.e. it is relative to the root directory of the
1351             mirror.
1352              
1353             If the argument is an array reference then all elements are treated as
1354             a path below the current tree and all are rsynced with a single
1355             command (and a single connection).
1356              
1357             =cut
1358              
1359             sub mirror_path {
1360 28     28 1 145 my($self,$path) = @_;
1361             # XXX simplify the two branches such that $path is treated as
1362             # [$path] maybe even demand the argument as an arrayref to
1363             # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1364             # interface)
1365 28 50 33     469 if (ref $path and ref $path eq "ARRAY") {
1366 28         134 my $dst = $self->localroot;
1367 28         5333 mkpath dirname $dst;
1368 28         265 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1369             lc $self->filenameroot,
1370             ),
1371             TMPDIR => 1,
1372             UNLINK => 0,
1373             );
1374 28         22741 for my $p (@$path) {
1375 1411         7557 print $fh $p, "\n";
1376             }
1377 28         1379 $fh->flush;
1378 28         386 $fh->unlink_on_destroy(1);
1379 28         569 my $gaveup = 0;
1380 28         87 my $retried = 0;
1381 28         893 local($ENV{LANG}) = "C";
1382 28         211 while (!$self->rsync->exec
1383             (
1384             src => join("/",
1385             $self->remoteroot,
1386             ),
1387             dst => $dst,
1388             'files-from' => $fh->filename,
1389             )) {
1390 0         0 my(@err) = $self->rsync->err;
1391 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1392 0 0       0 if ($self->verbose) {
1393 0         0 my $LFH = $self->_logfilehandle;
1394 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1395             }
1396 0         0 return 1;
1397             }
1398 0         0 $self->register_rsync_error (@err);
1399 0 0       0 if (++$retried >= 3) {
1400 0         0 my $batchsize = @$path;
1401 0         0 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1402 0         0 $gaveup = 1;
1403 0         0 last;
1404             }
1405 0         0 sleep 1;
1406             }
1407 28 50       1904905 unless ($gaveup) {
1408 28         878 $self->un_register_rsync_error ();
1409             }
1410             } else {
1411 0         0 my $dst = $self->local_path($path);
1412 0         0 mkpath dirname $dst;
1413 0         0 local($ENV{LANG}) = "C";
1414 0         0 while (!$self->rsync->exec
1415             (
1416             src => join("/",
1417             $self->remoteroot,
1418             $path
1419             ),
1420             dst => $dst,
1421             )) {
1422 0         0 my(@err) = $self->rsync->err;
1423 0 0 0     0 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1424 0 0       0 if ($self->verbose) {
1425 0         0 my $LFH = $self->_logfilehandle;
1426 0         0 print $LFH "Info: ignoring link_stat error '@err'";
1427             }
1428 0         0 return 1;
1429             }
1430 0         0 $self->register_rsync_error (@err);
1431             }
1432 0         0 $self->un_register_rsync_error ();
1433             }
1434 28         46643 return 1;
1435             }
1436              
1437             sub _my_ignore_link_stat_errors {
1438 0     0   0 my($self) = @_;
1439 0         0 my $x = $self->ignore_link_stat_errors;
1440 0 0       0 $x = 1 unless defined $x;
1441 0         0 return $x;
1442             }
1443              
1444             sub _my_current_rfile {
1445 6619     6619   19979 my($self) = @_;
1446 6619         14716 my $rfile;
1447 6619 100       24364 if ($self->_use_tempfile) {
1448 33         370 $rfile = $self->_current_tempfile;
1449             }
1450 6619 100 66     112124 unless ($rfile && -s $rfile) {
1451 6586         26069 $rfile = $self->rfile;
1452             }
1453 6619         27739 return $rfile;
1454             }
1455              
1456             =head2 $path = $obj->naive_path_normalize ($path)
1457              
1458             Takes an absolute unix style path as argument and canonicalizes it to
1459             a shorter path if possible, removing things like double slashes or
1460             C and removes references to C<../> directories to get a shorter
1461             unambiguos path. This is used to make the code easier that determines
1462             if a file passed to C is indeed below our C.
1463              
1464             =cut
1465              
1466             sub naive_path_normalize {
1467 1292     1292 1 3532 my($self,$path) = @_;
1468 1292         14917 $path =~ s|/+|/|g;
1469 1292         6206 1 while $path =~ s|/[^/]+/\.\./|/|;
1470 1292         4365 $path =~ s|/$||;
1471 1292         3879 $path;
1472             }
1473              
1474             =head2 $ret = $obj->read_recent_1 ( $data )
1475              
1476             Delegate of C on protocol 1
1477              
1478             =cut
1479              
1480             sub read_recent_1 {
1481 6487     6487 1 19336 my($self, $data) = @_;
1482 6487         19806 return $data->{recent};
1483             }
1484              
1485             =head2 $array_ref = $obj->recent_events ( %options )
1486              
1487             Note: the code relies on the resource being written atomically. We
1488             cannot lock because we may have no write access. If the caller has
1489             write access (eg. aggregate() or update()), it has to care for any
1490             necessary locking and it MUST write atomically.
1491              
1492             If C<$options{after}> is specified, only file events after this
1493             timestamp are returned.
1494              
1495             If C<$options{before}> is specified, only file events before this
1496             timestamp are returned.
1497              
1498             If C<$options{max}> is specified only a maximum of this many most
1499             recent events is returned.
1500              
1501             If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1502             will be returned.
1503              
1504             If C<$options{contains}> is specified the value must be a hash
1505             reference containing a query. The query may contain the keys C,
1506             C, and C. Each represents a condition that must be met. If
1507             there is more than one such key, the conditions are ANDed.
1508              
1509             If C<$options{info}> is specified, it must be a hashref. This hashref
1510             will be filled with metadata about the unfiltered recent_events of
1511             this object, in key C there is the first item, in key C
1512             is the last.
1513              
1514             =cut
1515              
1516             sub recent_events {
1517 6597     6597 1 205787 my ($self, %options) = @_;
1518 6597         17876 my $info = $options{info};
1519 6597 100       25815 if ($self->is_slave) {
1520             # XXX seems dubious, might produce tempfiles without removing them?
1521 37         666 $self->get_remote_recentfile_as_tempfile;
1522             }
1523 6597 50       85687 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1524 6597 100       112389 -e $rfile_or_tempfile or return [];
1525 6487         40493 my $suffix = $self->serializer_suffix;
1526 6487         52674 my ($data) = eval {
1527 6487         29389 $self->_try_deserialize
1528             (
1529             $suffix,
1530             $rfile_or_tempfile,
1531             );
1532             };
1533 6487         7239358 my $err = $@;
1534 6487 50 33     63842 if ($err or !$data) {
1535 0         0 return [];
1536             }
1537 6487         15482 my $re;
1538 6487 50       49701 if (reftype $data eq 'ARRAY') { # protocol 0
1539 0         0 $re = $data;
1540             } else {
1541 6487         29341 $re = $self->_recent_events_protocol_x
1542             (
1543             $data,
1544             $rfile_or_tempfile,
1545             );
1546             }
1547 6487 100       28710 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
  32435         170980  
1548 2005         9835 $self->_recent_events_handle_options ($re, \%options);
1549             }
1550              
1551             # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1552             sub _recent_events_handle_options {
1553 2005     2005   7155 my($self, $re, $options) = @_;
1554 2005         6340 my $last_item = $#$re;
1555 2005         5595 my $info = $options->{info};
1556 2005 100       6380 if ($info) {
1557 2000         8350 $info->{first} = $re->[0];
1558 2000         6065 $info->{last} = $re->[-1];
1559             }
1560 2005 100       8910 if (defined $options->{after}) {
1561 5 50       100 if ($re->[0]{epoch} > $options->{after}) {
1562 5 50       135 if (
1563             my $f = first
1564 125     125   455 {$re->[$_]{epoch} <= $options->{after}}
1565             0..$#$re
1566             ) {
1567 5         25 $last_item = $f-1;
1568             }
1569             } else {
1570 0         0 $last_item = -1;
1571             }
1572             }
1573 2005         4290 my $first_item = 0;
1574 2005 100       7735 if (defined $options->{before}) {
1575 2000 100       13720 if ($re->[0]{epoch} > $options->{before}) {
1576 1850 100       28730 if (
1577             my $f = first
1578 136645     136645   428615 {$re->[$_]{epoch} < $options->{before}}
1579             0..$last_item
1580             ) {
1581 625         2200 $first_item = $f;
1582             }
1583             } else {
1584 150         545 $first_item = 0;
1585             }
1586             }
1587 2005 50 66     24695 if (0 != $first_item || -1 != $last_item) {
1588 2005         20880 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1589             }
1590 2005 50       8670 if ($options->{'skip-deletes'}) {
1591 0         0 @$re = grep { $_->{type} ne "delete" } @$re;
  0         0  
1592             }
1593 2005 50       8960 if (my $contopt = $options->{contains}) {
1594 0         0 my $seen_allowed = 0;
1595 0         0 for my $allow (qw(epoch path type)) {
1596 0 0       0 if (exists $contopt->{$allow}) {
1597 0         0 $seen_allowed++;
1598 0         0 my $v = $contopt->{$allow};
1599 0         0 @$re = grep { $_->{$allow} eq $v } @$re;
  0         0  
1600             }
1601             }
1602 0 0       0 if (keys %$contopt > $seen_allowed) {
1603 0         0 require Carp;
1604 0         0 Carp::confess
1605             (sprintf "unknown query: %s", join ", ", %$contopt);
1606             }
1607             }
1608 2005 50 33     12555 if ($options->{max} && @$re > $options->{max}) {
1609 0         0 @$re = splice @$re, 0, $options->{max};
1610             }
1611 2005         24920 $re;
1612             }
1613              
1614             sub _recent_events_protocol_x {
1615 6487     6487   22460 my($self,
1616             $data,
1617             $rfile_or_tempfile,
1618             ) = @_;
1619 6487         46932 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1620             # we may be reading meta for the first time
1621 6487         15742 while (my($k,$v) = each %{$data->{meta}}) {
  68455         536699  
1622 61968 100       177575 if ($k ne lc $k){ # "Producers"
1623 6487         27464 $self->{ORIG}{$k} = $v;
1624 6487         24259 next;
1625             }
1626 55481 100       190688 next if defined $self->$k;
1627 10460         62446 $self->$k($v);
1628             }
1629 6487         28630 my $re = $self->$meth ($data);
1630 6487         13770 my $minmax;
1631 6487 50       134162 if (my @stat = stat $rfile_or_tempfile) {
1632 6487         35226 $minmax = { mtime => $stat[9] };
1633             } else {
1634             # defensive because ABH encountered:
1635              
1636             #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1637             #### Ydr_.yaml ... DONE
1638             #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1639             #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1640             #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1641             #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1642             #### gone already at cpan-pause.pl line 0
1643            
1644 0         0 my $LFH = $self->_logfilehandle;
1645 0         0 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1646             }
1647 6487 50       23727 if (@$re) {
1648 6487         24109 $minmax->{min} = $re->[-1]{epoch};
1649 6487         108554 $minmax->{max} = $re->[0]{epoch};
1650             }
1651 6487         27371 $self->minmax ( $minmax );
1652 6487         62351 return $re;
1653             }
1654              
1655             sub _try_deserialize {
1656 6487     6487   22682 my($self,
1657             $suffix,
1658             $rfile_or_tempfile,
1659             ) = @_;
1660 6487 50       23896 if ($suffix eq ".yaml") {
    0          
1661 6487         60385 require YAML::Syck;
1662 6487         32351 YAML::Syck::LoadFile($rfile_or_tempfile);
1663             } elsif ($HAVE->{"Data::Serializer"}) {
1664             my $serializer = Data::Serializer->new
1665 0         0 ( serializer => $serializers{$suffix} );
1666             my $serialized = do
1667 0         0 {
1668 0 0       0 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1669 0         0 local $/;
1670 0         0 <$fh>;
1671             };
1672 0         0 $serializer->raw_deserialize($serialized);
1673             } else {
1674 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1675             }
1676             }
1677              
1678             sub _refresh_internals {
1679 41     41   368 my($self, $dst) = @_;
1680 41         423 my $class = ref $self;
1681 41         1735 my $rfpeek = $class->new_from_file ($dst);
1682 41         418 for my $acc (qw(
1683             _merged
1684             minmax
1685             )) {
1686 82         1213 $self->$acc ( $rfpeek->$acc );
1687             }
1688 41         1049 my $old_dirtymark = $self->dirtymark;
1689 41         797 my $new_dirtymark = $rfpeek->dirtymark;
1690 41 100 66     1554 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
      100        
1691 5         52 $self->done->reset;
1692 5         67 $self->dirtymark ( $new_dirtymark );
1693 5         70 $self->_uptodateness_ever_reached(0);
1694 5         150 $self->seed;
1695             }
1696             }
1697              
1698             =head2 $ret = $obj->rfilename
1699              
1700             Just the basename of our I, composed from C,
1701             a dash, C, and C. E.g. C
1702              
1703             =cut
1704              
1705             sub rfilename {
1706 6682     6682 1 46347 my($self) = @_;
1707 6682         20606 my $file = sprintf("%s-%s%s",
1708             $self->filenameroot,
1709             $self->interval,
1710             $self->serializer_suffix,
1711             );
1712 6682         210822 return $file;
1713             }
1714              
1715             =head2 $str = $self->remote_dir
1716              
1717             The directory we are mirroring from.
1718              
1719             =cut
1720              
1721             sub remote_dir {
1722 15     15 1 185 my($self, $set) = @_;
1723 15 100       65 if (defined $set) {
1724 5         45 $self->_remote_dir ($set);
1725             }
1726 15         100 my $x = $self->_remote_dir;
1727 15         165 $self->is_slave (1);
1728 15         150 return $x;
1729             }
1730              
1731             =head2 $str = $obj->remoteroot
1732              
1733             =head2 (void) $obj->remoteroot ( $set )
1734              
1735             Get/Set the composed prefix needed when rsyncing from a remote module.
1736             If remote_host, remote_module, and remote_dir are set, it is composed
1737             from these.
1738              
1739             =cut
1740              
1741             sub remoteroot {
1742 83     83 1 672 my($self, $set) = @_;
1743 83 100       634 if (defined $set) {
1744 14         117 $self->_remoteroot($set);
1745             }
1746 83         574 my $remoteroot = $self->_remoteroot;
1747 83 100       1336 unless (defined $remoteroot) {
1748 5 50       210 $remoteroot = sprintf
    50          
    50          
1749             (
1750             "%s%s%s",
1751             defined $self->remote_host ? ($self->remote_host."::") : "",
1752             defined $self->remote_module ? ($self->remote_module."/") : "",
1753             defined $self->remote_dir ? $self->remote_dir : "",
1754             );
1755 5         35 $self->_remoteroot($remoteroot);
1756             }
1757 83         941 return $remoteroot;
1758             }
1759              
1760             =head2 (void) $obj->split_rfilename ( $recentfilename )
1761              
1762             Inverse method to C. C<$recentfilename> is a plain filename
1763             of the pattern
1764              
1765             $filenameroot-$interval$serializer_suffix
1766              
1767             e.g.
1768              
1769             RECENT-1M.yaml
1770              
1771             This filename is split into its parts and the parts are fed to the
1772             object itself.
1773              
1774             =cut
1775              
1776             sub split_rfilename {
1777 5     5 1 35 my($self, $rfname) = @_;
1778 5         45 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1779 5 50       80 if (my($f,$i,$s) = $rfname =~ $splitter) {
1780 5         35 $self->filenameroot ($f);
1781 5         60 $self->interval ($i);
1782 5         25 $self->serializer_suffix ($s);
1783             } else {
1784 0         0 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1785             }
1786 5         50 return;
1787             }
1788              
1789             =head2 my $rfile = $obj->rfile
1790              
1791             Returns the full path of the I
1792              
1793             =cut
1794              
1795             sub rfile {
1796 16096     16096 1 38809 my($self) = @_;
1797 16096         55499 my $rfile = $self->_rfile;
1798 16096 100       113087 return $rfile if defined $rfile;
1799 4980         16988 $rfile = File::Spec->catfile
1800             ($self->localroot,
1801             $self->rfilename,
1802             );
1803 4980         31392 $self->_rfile ($rfile);
1804 4980         45941 return $rfile;
1805             }
1806              
1807             =head2 $rsync_obj = $obj->rsync
1808              
1809             The File::Rsync object that this object uses for communicating with an
1810             upstream server.
1811              
1812             =cut
1813              
1814             sub rsync {
1815 69     69 1 308 my($self) = @_;
1816 69         592 my $rsync = $self->_rsync;
1817 69 100       987 unless (defined $rsync) {
1818 26   50     244 my $rsync_options = $self->rsync_options || {};
1819 26 50       355 if ($HAVE->{"File::Rsync"}) {
1820 26         534 $rsync = File::Rsync->new($rsync_options);
1821 26         149462 $self->_rsync($rsync);
1822             } else {
1823 0         0 die "File::Rsync required for rsync operations. Cannot continue";
1824             }
1825             }
1826 69         837 return $rsync;
1827             }
1828              
1829             =head2 (void) $obj->register_rsync_error(@err)
1830              
1831             =head2 (void) $obj->un_register_rsync_error()
1832              
1833             Register_rsync_error is called whenever the File::Rsync object fails
1834             on an exec (say, connection doesn't succeed). It issues a warning and
1835             sleeps for an increasing amount of time. Un_register_rsync_error
1836             resets the error count. See also accessor C.
1837              
1838             =cut
1839              
1840             {
1841             my $no_success_count = 0;
1842             my $no_success_time = 0;
1843             sub register_rsync_error {
1844 0     0 1 0 my($self, @err) = @_;
1845 0         0 chomp @err;
1846 0         0 $no_success_time = time;
1847 0         0 $no_success_count++;
1848 0         0 my $max_rsync_errors = $self->max_rsync_errors;
1849 0 0       0 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1850 0 0 0     0 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1851 0         0 require Carp;
1852 0         0 Carp::confess
1853             (
1854             sprintf
1855             (
1856             "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1857             $self->interval,
1858             join(" ",@err),
1859             $no_success_count,
1860             ));
1861             }
1862 0         0 my $sleep = 12 * $no_success_count;
1863 0 0       0 $sleep = 300 if $sleep > 300;
1864 0         0 require Carp;
1865 0         0 Carp::cluck
1866             (sprintf
1867             (
1868             "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1869             scalar(localtime($no_success_time)),
1870             $self->interval,
1871             join(" ",@err),
1872             $sleep,
1873             ));
1874 0         0 sleep $sleep
1875             }
1876             sub un_register_rsync_error {
1877 69     69 1 396 my($self) = @_;
1878 69         275 $no_success_time = 0;
1879 69         2099 $no_success_count = 0;
1880             }
1881             }
1882              
1883             =head2 $clone = $obj->_sparse_clone
1884              
1885             Clones just as much from itself that it does not hurt. Experimental
1886             method.
1887              
1888             Note: what fits better: sparse or shallow? Other suggestions?
1889              
1890             =cut
1891              
1892             sub _sparse_clone {
1893 3226     3226   11401 my($self) = @_;
1894 3226         31983 my $new = bless {}, ref $self;
1895 3226         11220 for my $m (qw(
1896             _interval
1897             _localroot
1898             _remoteroot
1899             _rfile
1900             _use_tempfile
1901             aggregator
1902             filenameroot
1903             ignore_link_stat_errors
1904             is_slave
1905             max_files_per_connection
1906             protocol
1907             rsync_options
1908             serializer_suffix
1909             sleep_per_connection
1910             tempdir
1911             verbose
1912             )) {
1913 51616         368962 my $o = $self->$m;
1914 51616 100       461161 $o = Storable::dclone $o if ref $o;
1915 51616         141309 $new->$m($o);
1916             }
1917 3226         29755 $new;
1918             }
1919              
1920             =head2 $boolean = OBJ->ttl_reached ()
1921              
1922             =cut
1923              
1924             sub ttl_reached {
1925 43     43 1 288 my($self) = @_;
1926 43   100     377 my $have_mirrored = $self->have_mirrored || 0;
1927 43         703 my $now = Time::HiRes::time;
1928 43         451 my $ttl = $self->ttl;
1929 43 50       706 $ttl = 24.2 unless defined $ttl;
1930 43 100       366 if ($now > $have_mirrored + $ttl) {
1931 10         69 return 1;
1932             }
1933 33         365 return 0;
1934             }
1935              
1936             =head2 (void) $obj->unlock()
1937              
1938             Unlocking is implemented with an C on a locking directory
1939             (C<.lock> appended to $rfile).
1940              
1941             =cut
1942              
1943             sub unlock {
1944 8286     8286 1 22312 my($self) = @_;
1945 8286 100       30251 return unless $self->_is_locked;
1946 3088         24448 my $rfile = $self->rfile;
1947 3088 50       188251 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1948 3088 50       129474 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1949 3088         17446 $self->_is_locked (0);
1950             }
1951              
1952             =head2 unseed
1953              
1954             Sets this recentfile in the state of not 'seeded'.
1955              
1956             =cut
1957             sub unseed {
1958 67     67 1 279 my($self) = @_;
1959 67         537 $self->seeded(0);
1960             }
1961              
1962             =head2 $ret = $obj->update ($path, $type)
1963              
1964             =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1965              
1966             =head2 $ret = $obj->update ()
1967              
1968             Enter one file into the local I. $path is the (usually
1969             absolute) path. If the path is outside I tree, then it is
1970             ignored.
1971              
1972             C<$type> is one of C or C.
1973              
1974             Events of type C may set $dirty_epoch. $dirty_epoch is normally
1975             not used and the epoch is calculated by the update() routine itself
1976             based on current time. But if there is the demand to insert a
1977             not-so-current file into the dataset, then the caller sets
1978             $dirty_epoch. This causes the epoch of the registered event to become
1979             $dirty_epoch or -- if the exact value given is already taken -- a tiny
1980             bit more. As compensation the dirtymark of the whole dataset is set to
1981             now or the current epoch, whichever is higher. Note: setting the
1982             dirty_epoch to the future is prohibited as it's very unlikely to be
1983             intended: it definitely might wreak havoc with the index files.
1984              
1985             The new file event is unshifted (or, if dirty_epoch is set, inserted
1986             at the place it belongs to, according to the rule to have a sequence
1987             of strictly decreasing timestamps) to the array of recent_events and
1988             the array is shortened to the length of the timespan allowed. This is
1989             usually the timespan specified by the interval of this recentfile but
1990             as long as this recentfile has not been merged to another one, the
1991             timespan may grow without bounds.
1992              
1993             The third form runs an update without inserting a new file. This may
1994             be desired to truncate a recentfile.
1995              
1996             =cut
1997             sub _epoch_monotonically_increasing {
1998 1614     1614   5435 my($self,$epoch,$recent) = @_;
1999 1614 100       4661 return $epoch unless @$recent; # the first one goes unoffended
2000 1579 100       25426 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
2001 1414         6627 return $epoch;
2002             } else {
2003 165         1195 return _increase_a_bit($recent->[0]{epoch});
2004             }
2005             }
2006             sub update {
2007 1646     1646 1 369946 my($self,$path,$type,$dirty_epoch) = @_;
2008 1646 50 66     10399 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2009 1292 50       3863 die "update called without path argument" unless defined $path;
2010 1292 50       5043 die "update called without type argument" unless defined $type;
2011 1292 50       16621 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2012             }
2013 1646         12891 $self->lock;
2014 1646         32262 my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
2015 1646 100       17565 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2016 1646         13109 $self->_assert_symlink;
2017 1646         9856 $self->unlock;
2018             }
2019              
2020             =head2 $obj->batch_update($batch)
2021              
2022             Like update but for many files. $batch is an arrayref containing
2023             hashrefs with the structure
2024              
2025             {
2026             path => $path,
2027             type => $type,
2028             epoch => $epoch,
2029             }
2030              
2031              
2032              
2033             =cut
2034             sub batch_update {
2035 0     0 1 0 my($self,$batch) = @_;
2036 0         0 $self->lock;
2037 0         0 my $ctx = $self->_locked_batch_update($batch);
2038 0 0       0 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2039 0         0 $self->_assert_symlink;
2040 0         0 $self->unlock;
2041             }
2042             sub _locked_batch_update {
2043 1646     1646   7498 my($self,$batch) = @_;
2044 1646         4401 my $something_done = 0;
2045 1646         6209 my $recent = $self->recent_events;
2046 1646 100       7285 unless ($recent->[0]) {
2047             # obstetrics
2048 35         65 $something_done = 1;
2049             }
2050 1646         5494 my %paths_in_recent = map { $_->{path} => undef } @$recent;
  57097         170618  
2051 1646         9937 my $interval = $self->interval;
2052 1646         6414 my $canonmeth = $self->canonize;
2053 1646 100       11407 unless ($canonmeth) {
2054 390         715 $canonmeth = "naive_path_normalize";
2055             }
2056 1646         3693 my $oldest_allowed = 0;
2057 1646         3532 my $setting_new_dirty_mark = 0;
2058 1646         3126 my $console;
2059 1646 50 66     7211 if ($self->verbose && @$batch > 1) {
2060 0         0 eval {require Time::Progress};
  0         0  
2061 0 0       0 warn "dollarat[$@]" if $@;
2062 0         0 $| = 1;
2063 0         0 $console = new Time::Progress;
2064 0         0 $console->attr( min => 1, max => scalar @$batch );
2065 0         0 print "\n";
2066             }
2067 1646         15707 my $i = 0;
2068 1646         3230 my $memo_splicepos;
2069 1646   0     7858 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
  0   0     0  
2070 1646         3499 $i++;
2071 1646 50 33     6743 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2072 1646         9849 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
2073 1646         6272 $something_done = $ctx->{something_done};
2074 1646         3785 $oldest_allowed = $ctx->{oldest_allowed};
2075 1646         3065 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
2076 1646         5748 $recent = $ctx->{recent};
2077 1646         7404 $memo_splicepos = $ctx->{memo_splicepos};
2078             }
2079 1646 50       5861 print "\n" if $console;
2080 1646 100       5755 if ($setting_new_dirty_mark) {
2081 32         96 $oldest_allowed = 0;
2082             }
2083 1646         4763 TRUNCATE: while (@$recent) {
2084 2650 100       11380 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
2085 1004         2040 pop @$recent;
2086 1004         4981 $something_done = 1;
2087             } else {
2088 1646         5512 last TRUNCATE;
2089             }
2090             }
2091 1646         31965 return {something_done=>$something_done,recent=>$recent};
2092             }
2093             sub _update_batch_item {
2094 1646     1646   8787 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2095 1646         5553 my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
  1646         8255  
2096 1646 50 66     9186 if (defined $path or defined $type or defined $dirty_epoch) {
      66        
2097 1292         6401 $path = $self->$canonmeth($path);
2098             }
2099             # you must calculate the time after having locked, of course
2100 1646         7511 my $now = Time::HiRes::time;
2101              
2102 1646         5813 my $epoch;
2103 1646 100 66     6361 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
2104 32         94 $epoch = $dirty_epoch;
2105             } else {
2106 1614         7492 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2107             }
2108 1646   50     5698 $recent ||= [];
2109 1646         11450 my $merged = $self->merged;
2110 1646 100 66     10320 if ($merged->{epoch} && !$setting_new_dirty_mark) {
2111 877         5476 my $virtualnow = _bigfloatmax($now,$epoch);
2112             # for the lower bound I think we need no big math, we calc already
2113 877         3716 my $secs = $self->interval_secs();
2114 877         18489 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
2115             } else {
2116             # as long as we are not merged at all, no limits!
2117             }
2118 1646         5744 my $lrd = $self->localroot;
2119 1646 100 66     27285 if (defined $path && $path =~ s|^\Q$lrd\E||) {
2120 1292         5796 $path =~ s|^/||;
2121 1292         3230 my $splicepos;
2122             # remove the older duplicates of this $path, irrespective of $type:
2123 1292 100       3734 if (defined $dirty_epoch) {
2124 32         178 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2125 32         101 $recent = $ctx->{recent};
2126 32         104 $splicepos = $ctx->{splicepos};
2127 32         91 $epoch = $ctx->{epoch};
2128 32         236 my $dirtymark = $self->dirtymark;
2129 32         200 my $new_dm = $now;
2130 32 50       124 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
2131 0         0 $new_dm = $epoch;
2132             }
2133 32         193 $self->dirtymark($new_dm);
2134 32         171 $setting_new_dirty_mark = 1;
2135 32 50 33     229 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
2136 32         151 $self->merged(+{});
2137             }
2138             } else {
2139 1260         3425 $recent = [ grep { $_->{path} ne $path } @$recent ];
  44656         113993  
2140 1260         3362 $splicepos = 0;
2141             }
2142 1292 50       4457 if (defined $splicepos) {
2143 1292         23306 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2144 1292         5881 $paths_in_recent->{$path} = undef;
2145             }
2146 1292         3210 $memo_splicepos = $splicepos;
2147 1292         3162 $something_done = 1;
2148             }
2149             return
2150             {
2151 1646         19901 something_done => $something_done,
2152             oldest_allowed => $oldest_allowed,
2153             setting_new_dirty_mark => $setting_new_dirty_mark,
2154             recent => $recent,
2155             memo_splicepos => $memo_splicepos,
2156             }
2157             }
2158             sub _update_with_dirty_epoch {
2159 32     32   168 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2160 32         66 my $splicepos;
2161 32         119 my $new_recent = [];
2162 32 50       162 if (exists $paths_in_recent->{$path}) {
2163 0         0 my $cancel = 0;
2164 0         0 KNOWN_EVENT: for my $i (0..$#$recent) {
2165 0 0       0 if ($recent->[$i]{path} eq $path) {
2166 0 0       0 if ($recent->[$i]{epoch} eq $epoch) {
2167             # nothing to do
2168 0         0 $cancel = 1;
2169 0         0 last KNOWN_EVENT;
2170             }
2171             } else {
2172 0         0 push @$new_recent, $recent->[$i];
2173             }
2174             }
2175 0 0       0 @$recent = @$new_recent unless $cancel;
2176             }
2177 32 50 33     272 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
    50          
2178 0         0 $splicepos = 0;
2179             } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2180 32         111 $splicepos = @$recent;
2181             } else {
2182 0         0 my $startingpoint;
2183 0 0 0     0 if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2184 0         0 $startingpoint = 0;
2185             } else {
2186 0         0 $startingpoint = $memo_splicepos;
2187             }
2188 0         0 RECENT: for my $i ($startingpoint..$#$recent) {
2189 0         0 my $ev = $recent->[$i];
2190 0 0       0 if ($epoch eq $recent->[$i]{epoch}) {
2191 0 0       0 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2192             }
2193 0 0       0 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2194 0         0 $splicepos = $i;
2195 0         0 last RECENT;
2196             }
2197             }
2198             }
2199             return {
2200 32         259 recent => $recent,
2201             splicepos => $splicepos,
2202             epoch => $epoch,
2203             }
2204             }
2205              
2206             =head2 seed
2207              
2208             Sets this recentfile in the state of 'seeded' which means it has to
2209             re-evaluate its uptodateness.
2210              
2211             =cut
2212             sub seed {
2213 28     28 1 135 my($self) = @_;
2214 28         156 $self->seeded(1);
2215             }
2216              
2217             =head2 seeded
2218              
2219             Tells if the recentfile is in the state 'seeded'.
2220              
2221             =cut
2222             sub seeded {
2223 134     134 1 763 my($self, $set) = @_;
2224 134 100       715 if (defined $set) {
2225 95         669 $self->_seeded ($set);
2226             }
2227 134         972 my $x = $self->_seeded;
2228 134 100       1205 unless (defined $x) {
2229 8         31 $x = 0;
2230 8         35 $self->_seeded ($x);
2231             }
2232 134         1057 return $x;
2233             }
2234              
2235             =head2 uptodate
2236              
2237             True if this object has mirrored the complete interval covered by the
2238             current recentfile.
2239              
2240             =cut
2241             sub uptodate {
2242 56     56 1 233 my($self) = @_;
2243 56         176 my $uptodate;
2244             my $why;
2245 56 100 66     328 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2246 19         97 $why = "saturated";
2247 19         53 $uptodate = 1;
2248             }
2249             # it's too easy to misconfigure ttl and related timings and then
2250             # never reach uptodateness, so disabled 2009-03-22
2251 56         708 if (0 and not defined $uptodate) {
2252             if ($self->ttl_reached){
2253             $why = "ttl_reached returned true, so we are not uptodate";
2254             $uptodate = 0 ;
2255             }
2256             }
2257 56 100       458 unless (defined $uptodate) {
2258             # look if recentfile has unchanged timestamp
2259 37         729 my $minmax = $self->minmax;
2260 37 100       674 if (exists $minmax->{mtime}) {
2261 21         222 my $rfile = $self->_my_current_rfile;
2262 21         591 my @stat = stat $rfile;
2263 21 50       149 if (@stat) {
2264 21         91 my $mtime = $stat[9];
2265 21 50 33     595 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
      33        
2266 0         0 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2267 0         0 $uptodate = 0;
2268             } else {
2269 21         190 my $covered = $self->done->covered(@$minmax{qw(max min)});
2270 21 50       305 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2271 21         124 $uptodate = $covered;
2272             }
2273             } else {
2274 0         0 require Carp;
2275 0         0 $why = "Could not stat '$rfile': $!";
2276 0         0 Carp::cluck($why);
2277 0         0 $uptodate = 0;
2278             }
2279             }
2280             }
2281 56 100       556 unless (defined $uptodate) {
2282 16         44 $why = "fallthrough, so not uptodate";
2283 16         40 $uptodate = 0;
2284             }
2285 56 100       242 if ($uptodate) {
2286 34         203 $self->_uptodateness_ever_reached(1);
2287             }
2288 56         519 my $remember =
2289             {
2290             uptodate => $uptodate,
2291             why => $why,
2292             };
2293 56         431 $self->_remember_last_uptodate_call($remember);
2294 56         704 return $uptodate;
2295             }
2296              
2297             =head2 $obj->write_recent ($recent_files_arrayref)
2298              
2299             Writes a I based on the current reflection of the current
2300             state of the tree limited by the current interval.
2301              
2302             =cut
2303             sub _resort {
2304 0     0   0 my($self) = @_;
2305 0         0 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
  0         0  
  0         0  
  0         0  
2306 0         0 return;
2307             }
2308             sub write_recent {
2309 2707     2707 1 9396 my ($self,$recent) = @_;
2310 2707 50       9242 die "write_recent called without argument" unless defined $recent;
2311 2707         4914 my $Last_epoch;
2312 2707         15083 SANITYCHECK: for my $i (0..$#$recent) {
2313 99709 50 66     408106 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2314 0         0 require Carp;
2315             Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2316 0         0 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2317             # you may want to:
2318             # $self->_resort($recent);
2319             # last SANITYCHECK;
2320             }
2321 99709         337820 $Last_epoch = $recent->[$i]{epoch};
2322             }
2323 2707         13361 my $minmax = $self->minmax;
2324 2707 100 100     49637 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2325 1668 50 33     13158 $minmax->{max} = @$recent && exists $recent->[0]{epoch} ? $recent->[0]{epoch} : undef;
2326             }
2327 2707 100 100     16152 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2328 583 50 33     4105 $minmax->{min} = @$recent && exists $recent->[-1]{epoch} ? $recent->[-1]{epoch} : undef;
2329             }
2330 2707         14784 $self->minmax($minmax);
2331 2707         33620 my $meth = sprintf "write_%d", $self->protocol;
2332 2707         36636 $self->$meth($recent);
2333             }
2334              
2335             =head2 $obj->write_0 ($recent_files_arrayref)
2336              
2337             Delegate of C on protocol 0
2338              
2339             =cut
2340              
2341             sub write_0 {
2342 0     0 1 0 my ($self,$recent) = @_;
2343 0         0 my $rfile = $self->rfile;
2344 0         0 YAML::Syck::DumpFile("$rfile.new",$recent);
2345 0 0       0 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2346             }
2347              
2348             =head2 $obj->write_1 ($recent_files_arrayref)
2349              
2350             Delegate of C on protocol 1
2351              
2352             =cut
2353              
2354             sub write_1 {
2355 2707     2707 1 8559 my ($self,$recent) = @_;
2356 2707         9416 my $rfile = $self->rfile;
2357 2707         10134 my $suffix = $self->serializer_suffix;
2358 2707         23236 my $data = {
2359             meta => $self->meta_data,
2360             recent => $recent,
2361             };
2362 2707         7015 my $serialized;
2363 2707 100       8075 if ($suffix eq ".yaml") {
    50          
2364 2692         14021 $serialized = YAML::Syck::Dump($data);
2365             } elsif ($HAVE->{"Data::Serializer"}) {
2366             my $serializer = Data::Serializer->new
2367 15         95 ( serializer => $serializers{$suffix} );
2368 15         10115 $serialized = $serializer->raw_serialize($data);
2369             } else {
2370 0         0 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2371             }
2372 2707 50       2613892 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2373 2707         90668 print $fh $serialized;
2374 2707 50       85149 close $fh or die "Could not close '$rfile.new': $!";
2375 2707 50       3333833 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2376             }
2377              
2378             BEGIN {
2379 8     8   97181 my $nq = qr/[^"]+/; # non-quotes
2380 8         86 my @pod_lines =
2381 8         33 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
  32         388  
  32         871  
  136         467  
2382              
2383             =head1 SERIALIZERS
2384              
2385             The following suffixes are supported and trigger the use of these
2386             serializers:
2387              
2388             =over 4
2389              
2390             =item C<< ".yaml" => "YAML::Syck" >>
2391              
2392             =item C<< ".json" => "JSON" >>
2393              
2394             =item C<< ".sto" => "Storable" >>
2395              
2396             =item C<< ".dd" => "Data::Dumper" >>
2397              
2398             =back
2399              
2400             =cut
2401              
2402             BEGIN {
2403 8     8   114 my @pod_lines =
2404 8         32 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
  64         2871  
  240         813  
2405              
2406             =head1 INTERVAL SPEC
2407              
2408             An interval spec is a primitive way to express time spans. Normally it
2409             is composed from an integer and a letter.
2410              
2411             As a special case, a string that consists only of the single letter
2412             C, stands for MAX_INT seconds.
2413              
2414             The following letters express the specified number of seconds:
2415              
2416             =over 4
2417              
2418             =item C<< s => 1 >>
2419              
2420             =item C<< m => 60 >>
2421              
2422             =item C<< h => 60*60 >>
2423              
2424             =item C<< d => 60*60*24 >>
2425              
2426             =item C<< W => 60*60*24*7 >>
2427              
2428             =item C<< M => 60*60*24*30 >>
2429              
2430             =item C<< Q => 60*60*24*90 >>
2431              
2432             =item C<< Y => 60*60*24*365.25 >>
2433              
2434             =back
2435              
2436             =cut
2437              
2438             =head1 SEE ALSO
2439              
2440             L,
2441             L,
2442             L
2443              
2444             =head1 BUGS
2445              
2446             Please report any bugs or feature requests through the web interface
2447             at
2448             L.
2449             I will be notified, and then you'll automatically be notified of
2450             progress on your bug as I make changes.
2451              
2452             =head1 KNOWN BUGS
2453              
2454             Memory hungry: it seems all memory is allocated during the initial
2455             rsync where a list of all files is maintained in memory.
2456              
2457             =head1 SUPPORT
2458              
2459             You can find documentation for this module with the perldoc command.
2460              
2461             perldoc File::Rsync::Mirror::Recentfile
2462              
2463             You can also look for information at:
2464              
2465             =over 4
2466              
2467             =item * RT: CPAN's request tracker
2468              
2469             L
2470              
2471             =item * AnnoCPAN: Annotated CPAN documentation
2472              
2473             L
2474              
2475             =item * CPAN Ratings
2476              
2477             L
2478              
2479             =item * Search CPAN
2480              
2481             L
2482              
2483             =back
2484              
2485              
2486             =head1 ACKNOWLEDGEMENTS
2487              
2488             Thanks to RJBS for module-starter.
2489              
2490             =head1 AUTHOR
2491              
2492             Andreas König
2493              
2494             =head1 COPYRIGHT & LICENSE
2495              
2496             Copyright 2008,2009 Andreas König.
2497              
2498             This program is free software; you can redistribute it and/or modify it
2499             under the same terms as Perl itself.
2500              
2501              
2502             =cut
2503              
2504             1; # End of File::Rsync::Mirror::Recentfile
2505              
2506             # Local Variables:
2507             # mode: cperl
2508             # cperl-indent-level: 4
2509             # End: