File Coverage

blib/lib/Prophet/Replica.pm
Criterion Covered Total %
statement 21 287 7.3
branch 0 110 0.0
condition 0 27 0.0
subroutine 7 62 11.2
pod 27 34 79.4
total 55 520 10.5


line stmt bran cond sub pod time code
1             package Prophet::Replica;
2 40     40   2692 use Any::Moose;
  40         39109  
  40         212  
3 40     40   20430 use Params::Validate qw(:all);
  40         66818  
  40         8153  
4 40     40   249 use File::Spec ();
  40         61  
  40         881  
5 40     40   170 use File::Path qw/mkpath/;
  40         55  
  40         2279  
6              
7 40     40   192 use constant state_db_uuid => 'state';
  40         53  
  40         3055  
8              
9 40     40   15420 use Prophet::App;
  40         177  
  40         176017  
10              
11             has metadata_store => (
12             is => 'rw',
13             isa => 'Prophet::MetadataStore',
14             documentation => 'Where metadata about other replicas is stored.',
15             );
16              
17              
18             has resolution_db_handle => (
19             is => 'rw',
20             isa => 'Prophet::Replica',
21             documentation => 'Where conflict resolutions are stored.',
22             );
23              
24             has is_resdb => (
25             is => 'rw',
26             isa => 'Bool',
27             documentation => 'Whether this replica is a resolution db or not.'
28             );
29              
30             has db_uuid => (
31             is => 'rw',
32             isa => 'Str',
33             documentation => 'The uuid of this replica.',
34             );
35 0     0 0   sub set_db_uuid { shift->db_uuid(@_) }
36              
37             has url => (
38             is => 'rw',
39             isa => 'Str',
40             documentation => 'Where this replica comes from.',
41             );
42              
43             has app_handle => (
44             is => 'ro',
45             isa => 'Prophet::App',
46             weak_ref => 1,
47             predicate => 'has_app_handle',
48             );
49              
50             has after_initialize => (
51             is => 'rw',
52             isa => 'CodeRef',
53             default => sub { sub {1} } # default returns a coderef
54             );
55              
56              
57             has uuid_generator => (
58             is => 'rw',
59             isa => 'Prophet::UUIDGenerator',
60             lazy => 1,
61             default => sub {
62             my $self = shift;
63             my $ug = Prophet::UUIDGenerator->new( uuid_scheme => 2 );
64             return $ug;
65              
66             }
67             );
68              
69              
70              
71             our $MERGETICKET_METATYPE = '_merge_tickets';
72              
73             =head1 NAME
74              
75             Prophet::Replica
76              
77             =head1 DESCRIPTION
78              
79             A base class for all Prophet replicas.
80              
81             =head1 METHODS
82              
83             =head3 get_handle
84              
85             Determines what replica class to use and instantiates it. Returns the
86             new replica object.
87              
88             =cut
89              
90             sub get_handle {
91 0     0 1   my $class = shift;
92 0 0         my %args = @_ == 1 ? %{ $_[0] } : @_;
  0            
93              
94 0           my ( $new_class, $scheme, $url ) = $class->_url_to_replica_class(%args);
95              
96 0 0         if ( !$new_class ) {
97 0           $class->log_fatal(
98 0           "I don't know how to handle the replica URL you provided - '@{[ $args{url}]}'."
99             ."\nIs your syntax correct?"
100             );
101             }
102              
103 0           Prophet::App->require($new_class);
104 0           my $handle = $new_class->new(%args);
105              
106 0 0 0       if ($handle->replica_exists && $handle->db_uuid) {
107 0           $handle->uuid_generator->set_uuid_scheme($handle->db_uuid);
108             }
109              
110 0           return $handle;
111             }
112              
113              
114              
115             sub initialize {
116 0     0 0   my $self = shift;
117 0           my %args = validate(
118             @_,
119             { db_uuid => 0,
120             replica_uuid => 0,
121             resdb_uuid => 0,
122             resdb_replica_uuid => 0,
123             }
124             );
125              
126 0 0         if ( !$self->fs_root_parent ) {
127              
128 0 0         if ( $self->can_write_changesets ) {
129 0           die "We can only create local prophet replicas. It looks like you're trying to create " . $self->url;
130             } else {
131 0           die "Prophet couldn't find a replica at \""
132             . $self->url
133             . "\"\n\n"
134             . "Please check the URL and try again.\n";
135              
136             }
137             }
138              
139 0 0         return undef if $self->replica_exists;
140              
141 0 0         $self->uuid_generator->set_uuid_scheme($args{'db_uuid'}) if ($args{db_uuid});
142              
143 0           for ( $self->_on_initialize_create_paths ) {
144 0           mkpath( [ File::Spec->catdir( $self->fs_root => $_ ) ] );
145             }
146              
147 0           $self->initialize_backend(%args);
148 0           $self->after_initialize->($self);
149             }
150              
151              
152              
153             =head2 store_local_metadata KEY => VALUE
154              
155             Takes a key and a value.
156              
157             Store some bit of metadata in a durable local datastore. Metadata isn't propagated
158             when replicas are synced.
159              
160             Returns true or false.
161              
162             =cut
163              
164             =head2 fetch_local_metadata KEY
165              
166             Takes a scalar key.
167              
168             Fetches a bit of metadata from the local metadata store.
169              
170             Returns the value of the key found in the local metadata store.
171              
172             Returns undef if there's no value for the key in the local metadata store.
173              
174             =cut
175              
176             sub replica_exists {
177 0     0 0   return 1; # XXX TODO HACK
178             }
179              
180             sub can_initialize {
181 0     0 0   return undef;
182             }
183              
184             =head3 _url_to_replica_class
185              
186             Returns the replica class for the given url based on its scheme.
187              
188             =cut
189              
190             sub _url_to_replica_class {
191 0     0     my $self = shift;
192 0           my %args = (@_);
193 0           my $url = $args{'url'};
194 0           my ( $scheme, $real_url ) = $url =~ /^([^:]*?):(.*)$/;
195              
196 0 0         return undef unless $scheme;
197              
198 0           for my $class (
199             ref( $args{app_handle} ) . "::Replica::" . $scheme,
200             "Prophet::Replica::".$scheme ) {
201 0 0         Prophet::App->try_to_require($class) || next;
202 0           return ( $class, $scheme, $real_url );
203             }
204 0           return undef;
205             }
206              
207             =head3 import_changesets { from => L ... }
208              
209             Given a L to import changes from, traverse all the
210             changesets we haven't seen before and integrate them into this replica.
211              
212             This routine calls L on the 'from' replica,
213             passing in the most recent changeset the current replica has seen
214             and a callback routine which calls L on the
215             local replica.
216              
217             That callback itself takes a callback, L
218             , which a replica implementation can use to perform some action
219             after a changeset is integrated into a peer. L
220             takes a paramhash, currently with only a single key, 'changeset'.
221              
222             =cut
223              
224             sub import_changesets {
225 0     0 1   my $self = shift;
226 0           my %args = validate(
227             @_,
228             { from => { isa => 'Prophet::Replica' },
229             resdb => { optional => 1 },
230             resolver => { optional => 1 },
231             resolver_class => { optional => 1 },
232             conflict_callback => { type => CODEREF, optional => 1 },
233             reporting_callback => { type => CODEREF, optional => 1 },
234             force => { optional => 1 },
235             }
236             );
237              
238 0           my $source = $args{'from'};
239              
240 0           $self->_check_db_uuids_on_merge(for => $source, force => $args{'force'});
241              
242 0 0         warn "The source (@{[$source->url]}) does not exist" unless ($source->replica_exists);
  0            
243              
244 0           $self->log_debug("Integrating changesets from ".$source->uuid. " after ". $self->last_changeset_from_source( $self->uuid ));
245              
246             $source->traverse_changesets(
247             after => $self->last_changeset_from_source( $source->uuid ),
248             before_load_changeset_callback => sub {
249 0     0     my %args = (@_);
250 0           my ($seq, $orig_uuid, $orig_seq, $key) = @{$args{changeset_metadata}};
  0            
251             # skip changesets we've seen before
252 0 0         if ( $self->has_seen_changeset( source_uuid => $orig_uuid, sequence_no => $orig_seq) ){
253             # If we've seen the changeset, yet we still got here, it means we saw it by original
254             # replica/sequence pair, but not # the direct upstream's uuid/sequence pair.
255             # recording that can help performance a whole bunch for next sync
256 0 0 0       if ($source->uuid && $seq > $self->last_changeset_from_source($source->uuid)) {
257 0           $self->record_last_changeset_from_replica( $source->uuid => $seq);
258             }
259 0           return undef;
260             } else {
261 0           return 1;
262             }
263              
264             },
265             callback => sub {
266 0     0     my %callback_args = (@_);
267             $self->integrate_changeset(
268             changeset => $callback_args{changeset},
269             conflict_callback => $args{'conflict_callback'},
270             reporting_callback => $args{'reporting_callback'},
271             resolver => $args{'resolver'},
272             resolver_class => $args{'resolver_class'},
273 0           resdb => $args{'resdb'},
274             );
275              
276 0 0         if ( ref( $callback_args{'after_integrate_changeset'} ) ) {
277 0           $callback_args{'after_integrate_changeset'}->( changeset => $callback_args{'changeset'} );
278             }
279              
280             }
281 0           );
282             }
283              
284             =head3 import_resolutions_from_remote_replica { from => L ... }
285              
286             Takes a L object (and possibly some optional arguments)
287             and imports its resolution changesets into this replica's resolution
288             database.
289              
290             Returns immediately if either the source replica or the target replica lack
291             a resolution database.
292              
293             =cut
294              
295             sub import_resolutions_from_remote_replica {
296 0     0 1   my $self = shift;
297 0           my %args = validate(
298             @_,
299             { from => { isa => 'Prophet::Replica' },
300             resolver => { optional => 1 },
301             resolver_class => { optional => 1 },
302             conflict_callback => { optional => 1 },
303             force => { optional => 1 },
304             }
305             );
306 0           my $source = $args{'from'};
307              
308 0 0         return unless $self->resolution_db_handle;
309 0 0         return unless $source->resolution_db_handle;
310              
311             $self->resolution_db_handle->import_changesets(
312             from => $source->resolution_db_handle,
313 0     0     resolver => sub { die "not implemented yet" },
314             force => $args{force},
315 0           );
316             }
317              
318             =head3 integrate_changeset L
319              
320             Given a L, integrate each and every change within that
321             changeset into the handle's replica.
322              
323             If there are conflicts, generate a nullification change, figure out a conflict
324             resolution and apply the nullification, original change and resolution all at
325             once (as three separate changes).
326              
327             If there are no conflicts, just apply the change.
328              
329             This routine also records that we've seen this changeset (and hence everything
330             before it) from both the peer who sent it to us AND the replica which originally
331             created it.
332              
333             =cut
334              
335             sub integrate_changeset {
336 0     0 1   my $self = shift;
337 0           my %args = validate(
338             @_,
339             { changeset => { isa => 'Prophet::ChangeSet' },
340             resolver => { optional => 1 },
341             resolver_class => { optional => 1 },
342             resdb => { optional => 1 },
343             conflict_callback => { optional => 1 },
344             reporting_callback => { optional => 1 }
345             }
346             );
347              
348 0           my $changeset = $args{'changeset'};
349              
350              
351 0           $self->log_debug("Considering changeset ".$changeset->original_sequence_no .
352             " from " . $self->display_name_for_replica($changeset->original_source_uuid));
353              
354             # when we start to integrate a changeset, we need to do a bit of housekeeping
355             # We never want to merge in:
356             # - merge tickets that describe merges from the local record
357              
358             # When we integrate changes, sometimes we will get handed changes we
359             # already know about.
360             # - changes from local
361             # - changes from some other party we've merged from
362             # - merge tickets for the same
363             # we'll want to skip or remove those changesets
364              
365              
366 0 0         if (! $self->should_accept_changeset($changeset) ){
    0          
367             # if it's a changeset we don't care about, mark it as seen and move on
368 0           $self->record_integration_of_changeset($changeset);
369             $args{'reporting_callback'}->( changeset => $changeset, )
370 0 0         if ( $args{'reporting_callback'} );
371 0           return;
372             }
373             elsif ( my $conflict = $self->conflicts_from_changeset($changeset) ) {
374 0           $self->log_debug( "Integrating conflicting changeset "
375             . $changeset->original_sequence_no
376             . " from "
377             . $self->display_name_for_replica( $changeset->original_source_uuid ) );
378 0 0         $args{conflict_callback}->($conflict) if $args{'conflict_callback'};
379 0 0   0     $conflict->resolvers( [ sub { $args{resolver}->(@_) } ] ) if $args{resolver};
  0            
380 0 0         if ( $args{resolver_class} ) {
381 0 0         Prophet::App->require( $args{resolver_class} ) || die $@;
382             $conflict->resolvers(
383             [ sub {
384 0     0     $args{resolver_class}->new->run(@_);
385             }
386 0           ]
387             );
388             }
389 0           my $resolutions = $conflict->generate_resolution( $args{resdb} );
390              
391             #figure out our conflict resolution
392              
393             # IMPORTANT: these should be an atomic unit. dying here would be poor.
394             # BUT WE WANT THEM AS THREE DIFFERENT CHANGESETS
395              
396             # integrate the nullification change
397 0           $self->record_changes( $conflict->nullification_changeset );
398              
399             # integrate the original change
400 0           $self->record_changeset_and_integration($changeset);
401              
402             # integrate the conflict resolution change
403 0           $self->record_resolutions( $conflict->resolution_changeset );
404              
405             $args{'reporting_callback'}->(
406             changeset => $changeset,
407             conflict => $conflict
408 0 0         ) if ( $args{'reporting_callback'} );
409 0           return 1;
410             } else {
411 0           $self->log_debug("Integrating changeset ".$changeset->original_sequence_no .
412             " from " . $self->display_name_for_replica($changeset->original_source_uuid));
413 0           $self->record_changeset_and_integration($changeset);
414 0 0         $args{'reporting_callback'}->( changeset => $changeset ) if ( $args{'reporting_callback'} );
415 0           return 1;
416             }
417             }
418              
419             =head3 record_changeset_and_integration L
420              
421             Given a L, integrate each and every change within that
422             changeset into the handle's replica.
423              
424             If the state handle is in the middle of an edit, the integration of this
425             changeset is recorded as part of that edit; if not, it is recorded as a new
426             edit.
427              
428             =cut
429              
430             sub record_changeset_and_integration {
431 0     0 1   my $self = shift;
432 0           my $changeset = shift;
433              
434 0           $self->begin_edit(source => $changeset);
435 0           $self->record_changes($changeset);
436              
437 0           $self->record_integration_of_changeset($changeset);
438              
439 0           $self->_set_original_source_metadata_for_current_edit($changeset);
440 0           $self->commit_edit;
441              
442 0           return;
443             }
444              
445             =head3 last_changeset_from_source $SOURCE_UUID
446              
447             Returns the last changeset id seen from the replica identified by $SOURCE_UUID.
448              
449             =cut
450              
451             sub last_changeset_from_source {
452 0     0 1   my $self = shift;
453 0           my ($source) = validate_pos( @_, { type => SCALAR } );
454              
455 0           my $changeset_num = $self->fetch_local_metadata('last-changeset-from-'.$source);
456             # 0 is a valid changeset #
457 0 0         return defined $changeset_num ? $changeset_num : -1;
458             }
459              
460              
461             =head3 has_seen_changeset { source_uuid => , sequence_no => }
462              
463             Returns true if we've previously integrated this changeset, even if we
464             originally received it from a different peer.
465              
466             =cut
467              
468             sub has_seen_changeset {
469 0     0 1   my $self = shift;
470 0           my %args = validate( @_, {source_uuid => 1, sequence_no => 1});
471             $self->log_debug("Checking to see if we've ever seen changeset " .
472             $args{sequence_no} . " from " .
473 0           $self->display_name_for_replica($args{source_uuid}));
474              
475             # If the changeset originated locally, we never want it
476 0 0         if ($args{source_uuid} eq $self->uuid ) {
    0          
477 0           $self->log_debug("\t - We have. (It originated locally.)");
478 0           return 1
479             }
480             # Otherwise, if the we have a merge ticket from the source, we don't want
481             # the changeset if the source's sequence # is >= the changeset's sequence
482             # #, we can safely skip it
483             elsif ( $self->last_changeset_from_source( $args{source_uuid} ) >= $args{sequence_no} ) {
484 0           $self->log_debug("\t - We have seen this or a more recent changeset from remote.");
485 0           return 1;
486             } else {
487 0           return undef;
488             }
489             }
490              
491             =head3 changeset_will_conflict L
492              
493             Returns true if any change that's part of this changeset won't apply cleanly to
494             the head of the current replica.
495              
496             =cut
497              
498             sub changeset_will_conflict {
499 0     0 1   my $self = shift;
500 0           my ($changeset) = validate_pos( @_, { isa => "Prophet::ChangeSet" } );
501              
502 0 0         return 1 if ( $self->conflicts_from_changeset($changeset) );
503              
504 0           return undef;
505             }
506              
507             =head3 conflicts_from_changeset L
508              
509             Returns a L object if the supplied L
510             will generate conflicts if applied to the current replica.
511              
512             Returns undef if the current changeset wouldn't generate a conflict.
513              
514             =cut
515              
516             sub conflicts_from_changeset {
517 0     0 1   my $self = shift;
518 0           my ($changeset) = validate_pos( @_, { isa => "Prophet::ChangeSet" } );
519 0           require Prophet::Conflict;
520 0           my $conflict = Prophet::Conflict->new( { changeset => $changeset,
521             prophet_handle => $self} );
522              
523 0           $conflict->analyze_changeset();
524              
525 0 0         return undef unless $conflict->has_conflicting_changes;
526              
527 0           $self->log_debug("Conflicting changeset: ".JSON::to_json($conflict, {allow_blessed => 1}));
528              
529 0           return $conflict;
530             }
531              
532             sub _check_db_uuids_on_merge {
533 0     0     my $self = shift;
534 0           my %args = validate( @_,
535             { for => { isa => 'Prophet::Replica' },
536             force => 0,
537             });
538 0 0 0       if ( $self->db_uuid && $args{for}->db_uuid
      0        
539             && $self->db_uuid ne $args{for}->db_uuid ) {
540 0 0         unless ( $args{'force'} ) {
541             die "You are trying to merge two different databases! This is NOT\n"
542             . "recommended. If you really want to do this, add '--force' to\n"
543             . "your commandline.\n\n"
544             . "Local database: "
545             . $self->db_uuid . "\n"
546             . "Remote database: "
547 0           . $args{for}->db_uuid . "\n";
548             }
549             }
550             }
551              
552             =head3 should_accept_changeset { from => L, changeset => L }
553              
554             Returns true if this replica hasn't yet seen the changeset C.
555              
556             =cut
557              
558             sub should_accept_changeset {
559 0     0 1   my $self = shift;
560 0           my ($changeset) = validate_pos( @_, { changeset => { isa => 'Prophet::ChangeSet' } });
561              
562              
563 0           $self->log_debug("Should I accept " .$changeset->original_sequence_no .
564             " from ".$self->display_name_for_replica($changeset->original_source_uuid));
565 0 0         return undef if (! $changeset->has_changes);
566 0 0 0       return undef if ( $changeset->is_nullification || $changeset->is_resolution );
567 0 0         return undef if $self->has_seen_changeset( sequence_no => $changeset->original_sequence_no, source_uuid => $changeset->original_source_uuid );
568 0           $self->log_debug("Yes, it has changes, isn't a nullification and I haven't seen it before");
569              
570 0           return 1;
571             }
572              
573             =head3 fetch_changesets { after => SEQUENCE_NO }
574              
575             Fetch all changesets from this replica after the local sequence number SEQUENCE_NO.
576              
577             Returns a reference to an array of L objects.
578              
579             See also L for replica implementations to provide
580             streamly interface.
581              
582             =cut
583              
584             sub fetch_changesets {
585 0     0 1   my $self = shift;
586 0           my %args = validate( @_, { after => 1 } );
587 0           my @results;
588              
589 0     0     $self->traverse_changesets( %args, callback => sub { my %args = @_; push @results, $args{changeset} } );
  0            
  0            
590              
591 0           return \@results;
592             }
593              
594             =head2 methods to be implemented by a replica backend
595              
596             =head3 uuid
597              
598             Returns this replica's uuid.
599              
600             =cut
601              
602       0 1   sub uuid {}
603              
604             =head3 latest_sequence_no
605              
606             Returns the sequence # of the most recently committed changeset.
607              
608             =cut
609              
610 0     0 1   sub latest_sequence_no { return undef }
611              
612             =head3 find_or_create_luid { uuid => UUID }
613              
614             Finds or creates a LUID for the given UUID.
615              
616             =cut
617              
618             sub find_or_create_luid {
619 0     0 1   my $self = shift;
620 0           my %args = validate( @_, { uuid => 1 } );
621              
622 0           my $mapping = $self->_read_guid2luid_mappings;
623              
624 0 0         if (!exists($mapping->{ $args{'uuid'} })) {
625 0           $mapping->{ $args{'uuid'} } = $self->_create_luid($mapping);
626 0           $self->_write_guid2luid_mappings($mapping);
627             }
628              
629 0           return $mapping->{ $args{'uuid'} };
630             }
631              
632             sub find_luid_by_uuid {
633 0     0 0   my $self = shift;
634 0           my %args = validate( @_, { uuid => 1 } );
635 0           my $mapping = $self->_read_guid2luid_mappings;
636              
637 0 0         if (!exists($mapping->{ $args{'uuid'} })) {
638 0           return undef;
639             }
640              
641 0           return $mapping->{ $args{'uuid'} };
642              
643             }
644              
645              
646             =head3 find_uuid_by_luid { luid => LUID }
647              
648             Finds the UUID for the given LUID. Returns C if the LUID is not known.
649              
650             =cut
651              
652             sub find_uuid_by_luid {
653 0     0 1   my $self = shift;
654 0           my %args = validate( @_, { luid => 1 } );
655              
656 0           my $mapping = $self->_read_luid2guid_mappings;
657 0           return $mapping->{ $args{'luid'} };
658             }
659              
660             =head3 _create_luid ( 'uuid' => 'luid' )
661              
662             Given a UUID => LUID hash mapping, return a new unused LUID (one
663             higher than the mapping's current highest luid).
664              
665             =cut
666              
667             sub _create_luid {
668 0     0     my $self = shift;
669 0           my $map = shift;
670              
671 0           return ++$map->{'_meta'}{'maximum_luid'};
672             }
673              
674             =head3 _do_userdata_read $PATH $DEFAULT
675              
676             Returns a reference to the parsed JSON contents of the file
677             given by C<$PATH> in the replica's userdata directory.
678              
679             Returns C<$DEFAULT> if the file does not exist.
680              
681             =cut
682              
683             sub _do_userdata_read {
684 0     0     my $self = shift;
685 0           my $path = shift;
686 0           my $default = shift;
687 0   0       my $json = $self->read_userdata( path => $path ) || $default;
688 0           require JSON;
689 0           return JSON::from_json($json, { utf8 => 1 });
690             }
691              
692             =head3 _do_userdata_write $PATH $VALUE
693              
694             serializes C<$VALUE> to JSON and writes it to the file given by C<$PATH>
695             in the replica's userdata directory, creating parent directories as
696             necessary.
697              
698             =cut
699              
700             sub _do_userdata_write {
701 0     0     my $self = shift;
702 0           my $path = shift;
703 0           my $value = shift;
704              
705 0           require JSON;
706 0           my $content = JSON::to_json($value, { canonical => 1, pretty => 0, utf8 => 1 });
707              
708 0           $self->write_userdata(
709             path => $path,
710             content => $content,
711             );
712             }
713              
714             =head3 _upstream_replica_cache_file
715              
716             A string representing the name of the file where replica URLs that have been
717             previously pulled from are cached.
718              
719             =cut
720              
721 0     0     sub _upstream_replica_cache_file { "upstream-replica-cache" }
722              
723             =head3 _read_cached_upstream_replicas
724              
725             Returns a list of cached upstream replica URLs, or an empty list if
726             there are no cached URLs.
727              
728             =cut
729              
730             sub _read_cached_upstream_replicas {
731 0     0     my $self = shift;
732 0 0         return @{ $self->_do_userdata_read( $self->_upstream_replica_cache_file, '[]' ) || [] };
  0            
733             }
734              
735             =head3 _write_cached_upstream_replicas @REPLICAS
736              
737             writes the replica URLs given by C<@REPLICAS> to the upstream replica
738             cache file.
739              
740             =cut
741              
742             sub _write_cached_upstream_replicas {
743 0     0     my $self = shift;
744 0           my @replicas = @_;
745 0           return $self->_do_userdata_write( $self->_upstream_replica_cache_file, [@replicas] );
746             }
747              
748             =head3 _guid2luid_file
749              
750             The file in the replica's userdata directory which contains a serialized
751             JSON UUID => LUID hash mapping.
752              
753             =cut
754              
755 0     0     sub _guid2luid_file { "local-id-cache" }
756              
757             =head3 _read_guid2luid_mappings
758              
759             Returns a UUID => LUID hashref for this replica.
760              
761             =cut
762              
763             sub _read_guid2luid_mappings {
764 0     0     my $self = shift;
765 0           return $self->_do_userdata_read( $self->_guid2luid_file, '{}' );
766             }
767              
768             =head3 _write_guid2luid_mappings ( 'uuid' => 'luid' )
769              
770             Writes the given UUID => LUID hash map to C as serialized
771             JSON.
772              
773             =cut
774              
775             sub _write_guid2luid_mappings {
776 0     0     my $self = shift;
777 0           my $map = shift;
778              
779 0           return $self->_do_userdata_write( $self->_guid2luid_file, $map );
780             }
781              
782             =head3 _read_luid2guid_mappings
783              
784             Returns a LUID => UUID hashref for this replica.
785              
786             =cut
787              
788             sub _read_luid2guid_mappings {
789 0     0     my $self = shift;
790 0           my $guid2luid = $self->_read_guid2luid_mappings(@_);
791 0           delete $guid2luid->{'_meta'};
792 0           my %luid2guid = reverse %$guid2luid;
793 0           return \%luid2guid;
794             }
795              
796             =head3 traverse_changesets { after => SEQUENCE_NO, until => SEQUENCE_NO, callback => sub { my %data = (changeset => undef, @_} }
797              
798             Walk through each changeset in the replica after SEQUENCE_NO, calling the
799             C for each one in turn.
800              
801             =cut
802              
803             sub traverse_changesets {
804 0     0 1   my $class = blessed($_[0]);
805 0           Carp::confess "$class has failed to implement a 'traverse_changesets' method for their replica type.";
806             }
807              
808             =head3 can_read_changesets
809              
810             Returns true if this source is one we know how to read from (and have
811             permission to do so).
812              
813             =cut
814              
815 0     0 1   sub can_read_changesets { undef }
816              
817             =head3 can_write_changesets
818              
819             Returns true if this source is one we know how to write to (and have permission
820             to write to).
821              
822             Returns false otherwise.
823              
824             =cut
825              
826 0     0 1   sub can_write_changesets { undef }
827              
828             =head3 record_resolutions L
829              
830             Given a resolution changeset, record all the resolution changesets as well as
831             resolution records in the local resolution database.
832              
833             Called ONLY on local resolution creation. (Synced resolutions are just synced
834             as records.)
835              
836             =cut
837              
838             sub record_resolutions {
839 0     0 1   my $self = shift;
840 0           my ($changeset) = validate_pos(@_, { isa => 'Prophet::ChangeSet'});
841              
842 0 0         $self->_unimplemented("record_resolutions (since there is no writable handle)")
843             unless ($self->can_write_changesets);
844              
845             # If we have a resolution db handle, record the resolutions there.
846             # Otherwise, record them locally
847 0   0       my $res_handle = $self->resolution_db_handle || $self;
848              
849 0 0         return unless $changeset->has_changes;
850              
851 0           $self->begin_edit(source => $changeset);
852 0           $self->record_changes($changeset);
853 0           $res_handle->_record_resolution($_) for $changeset->changes;
854 0           $self->commit_edit();
855             }
856              
857             =head3 _record_resolution L
858              
859             Called ONLY on local resolution creation. (Synced resolutions are just synced
860             as records.)
861              
862             =cut
863              
864             sub _record_resolution {
865 0     0     my $self = shift;
866 0           my ($change) = validate_pos(@_, { isa => 'Prophet::Change'});
867              
868 0 0         return 1 if $self->record_exists(
869             uuid => $self->uuid,
870             type => '_prophet_resolution-' . $change->resolution_cas
871             );
872              
873             $self->create_record(
874             uuid => $self->uuid,
875             type => '_prophet_resolution-' . $change->resolution_cas,
876             props => {
877             _meta => $change->change_type,
878 0           map { $_->name => $_->new_value } $change->prop_changes
  0            
879             }
880             );
881             }
882              
883             =head2 routines dealing with integrating changesets into a replica
884              
885             =head3 record_changes L
886              
887             Inside an edit (transaction), integrate all changes in this changeset
888             and then call the _after_record_changes() hook.
889              
890             =cut
891              
892             sub record_changes {
893 0     0 1   my $self = shift;
894 0           my ($changeset) = validate_pos(@_, { isa => 'Prophet::ChangeSet'});
895 0 0         $self->_unimplemented ('record_changes') unless ($self->can_write_changesets);
896 0           eval {
897 0           local $SIG{__DIE__} = 'DEFAULT';
898 0 0         my $inside_edit = $self->current_edit ? 1 : 0;
899 0 0         $self->begin_edit(source => $changeset) unless ($inside_edit);
900 0           $self->integrate_changes($changeset);
901 0           $self->_after_record_changes($changeset);
902 0 0         $self->commit_edit() unless ($inside_edit);
903             };
904 0 0         die($@) if ($@);
905             }
906              
907             =head3 integrate_changes L
908              
909             This routine is called by L with a L
910             object. It integrates all changes from that object into the current replica.
911              
912             All bookkeeping, such as opening and closing an edit, is done by
913             L.
914              
915             If your replica type needs to play games to integrate multiple changes as a
916             single record, this is what you'd override.
917              
918             =cut
919              
920             sub integrate_changes {
921 0     0 1   my ($self, $changeset) = validate_pos( @_, {isa => 'Prophet::Replica'},
922             { isa => 'Prophet::ChangeSet' } );
923 0           $self->integrate_change($_, $changeset) for ( $changeset->changes );
924              
925             }
926              
927             =head2 integrate_change L
928              
929             Integrates the given change into the current replica. Used in
930             L.
931              
932             =cut
933              
934             sub integrate_change {
935 0     0 1   my ($self, $change) = validate_pos(@_, { isa => 'Prophet::Replica' },
936             { isa => 'Prophet::Change' },
937             { isa => 'Prophet::ChangeSet' }
938             );
939              
940 0           my %new_props = map { $_->name => $_->new_value } $change->prop_changes;
  0            
941 0 0         if ( $change->change_type eq 'add_file' ) {
    0          
    0          
    0          
942 0           $self->log_debug("add_file: " .$change->record_type. " " .$change->record_uuid);
943 0           $self->create_record( type => $change->record_type, uuid => $change->record_uuid, props => \%new_props);
944             } elsif ( $change->change_type eq 'add_dir' ) {
945 0           $self->log_debug("(IGNORED) add_dir: " .$change->record_type. " " .$change->record_uuid);
946             } elsif ( $change->change_type eq 'update_file' ) {
947 0           $self->log_debug("update_file: " .$change->record_type. " " .$change->record_uuid);
948 0           $self->set_record_props( type => $change->record_type, uuid => $change->record_uuid, props => \%new_props);
949             } elsif ( $change->change_type eq 'delete' ) {
950 0           $self->log_debug("delete_file: " .$change->record_type. " " .$change->record_uuid);
951 0           $self->delete_record( type => $change->record_type, uuid => $change->record_uuid);
952             } else {
953 0           Carp::confess( "Unknown change type: " . $change->change_type );
954             }
955             }
956              
957             =head3 record_integration_of_changeset L
958              
959             This routine records the immediately upstream and original source
960             uuid and sequence numbers for this changeset. Prophet uses this
961             data to make sane choices about later replay and merge operations
962              
963             =cut
964              
965             sub record_integration_of_changeset {
966 0     0 1   my $self = shift;
967 0           my ($changeset) = validate_pos( @_, { isa => 'Prophet::ChangeSet' } );
968              
969 0 0 0       if ( $changeset->original_source_uuid ne $self->uuid
970             && ( $self->last_changeset_from_source( $changeset->original_source_uuid ) < $changeset->original_sequence_no )
971             ) {
972 0           $self->record_last_changeset_from_replica(
973             $changeset->original_source_uuid => $changeset->original_sequence_no );
974             }
975 0 0         if ( $changeset->source_uuid ) {
976 0 0         if ( $self->last_changeset_from_source( $changeset->source_uuid ) < $changeset->sequence_no ) {
977 0           $self->record_last_changeset_from_replica( $changeset->source_uuid => $changeset->sequence_no );
978             }
979             }
980             }
981              
982             sub record_last_changeset_from_replica {
983 0     0 0   my $self = shift;
984 0           my ($uuid, $sequence) = validate_pos(@_, 1,1);
985 0           return $self->store_local_metadata( 'last-changeset-from-' . $uuid => $sequence );
986              
987             }
988              
989             =head2 routines which need to be implemented by any Prophet backend store
990              
991             =head3 uuid
992              
993             Returns this replica's UUID.
994              
995             =head3 create_record { type => $TYPE, uuid => $UUID, props => { key-value pairs } }
996              
997             Create a new record of type C<$TYPE> with uuid C<$UUID> within the current
998             replica.
999              
1000             Sets the record's properties to the key-value hash passed in as the C
1001             argument.
1002              
1003             If called from within an edit, it uses the current edit. Otherwise it
1004             manufactures and finalizes one of its own.
1005              
1006             =head3 delete_record {uuid => $UUID, type => $TYPE }
1007              
1008             Deletes the record C<$UUID> of type C<$TYPE> from the current replica.
1009              
1010             Manufactures its own new edit if C<$self->current_edit> is undefined.
1011              
1012             =head3 set_record_props { uuid => $UUID, type => $TYPE, props => {hash of kv pairs }}
1013              
1014             Updates the record of type C<$TYPE> with uuid C<$UUID> to set each property
1015             defined by the props hash. It does NOT alter any property not defined by the
1016             props hash.
1017              
1018             Manufactures its own current edit if none exists.
1019              
1020             =head3 get_record_props { uuid => $UUID, type => $TYPE, root => $ROOT }
1021              
1022             Returns a hashref of all properties for the record of type C<$TYPE> with uuid
1023             C<$UUID>.
1024              
1025             'root' is an optional argument which you can use to pass in an alternate
1026             historical version of the replica to inspect. Code to look at the immediately
1027             previous version of a record might look like:
1028              
1029             $handle->get_record_props(
1030             type => $record->type,
1031             uuid => $record->uuid,
1032             root => $self->repo_handle->fs->revision_root( $self->repo_handle->fs->youngest_rev - 1 )
1033             );
1034              
1035             =head3 record_exists {uuid => $UUID, type => $TYPE, root => $ROOT }
1036              
1037             Returns true if the record in question exists and false otherwise.
1038              
1039             =head3 list_records { type => $TYPE }
1040              
1041             Returns a reference to a list of all the records of type $TYPE.
1042              
1043             =head3 list_records
1044              
1045             Returns a reference to a list of all the known types in your Prophet database.
1046              
1047             =head3 type_exists { type => $type }
1048              
1049             Returns true if we have any records of type C<$TYPE>.
1050              
1051             =head2 routines which need to be implemented by any _writable_ prophet backend store
1052              
1053             =head2 optional routines which are provided for you to override with backend-store specific behaviour
1054              
1055             =head3 _after_record_changes L
1056              
1057             Called after the replica has integrated a new changeset but before closing the
1058             current transaction/edit.
1059              
1060             The SVN backend, for example, used this to record author metadata about this
1061             changeset.
1062              
1063             =cut
1064              
1065             sub _after_record_changes {
1066 0     0     return 1;
1067             }
1068              
1069             =head3 _set_original_source_metadata_for_current_edit
1070              
1071             Sets C and C for the current edit.
1072              
1073             =cut
1074              
1075       0     sub _set_original_source_metadata_for_current_edit {}
1076              
1077             =head2 helper routines
1078              
1079             =cut
1080              
1081             =head3 log $MSG
1082              
1083             Logs the given message to C (but only if the C
1084             environmental variable is set).
1085              
1086             =cut
1087              
1088             sub log {
1089 0     0 1   my $self = shift;
1090 0           my ($msg) = validate_pos(@_, 1);
1091 0 0         Carp::confess unless ($self->app_handle);
1092 0           $self->app_handle->log($msg);
1093             }
1094              
1095             sub log_debug {
1096 0     0 0   my $self = shift;
1097 0           my $msg = shift;
1098 0           $self->app_handle->log_debug($self->display_name_for_replica.": " .$msg);
1099             }
1100              
1101             =head2 log_fatal $MSG
1102              
1103             Logs the given message and dies with a stack trace.
1104              
1105             =cut
1106              
1107             sub log_fatal {
1108 0     0 1   my $self = shift;
1109              
1110             # always skip this fatal_error function when generating a stack trace
1111 0           local $Carp::CarpLevel = $Carp::CarpLevel + 1;
1112 0 0         if ( eval {$self->app_handle }) {
  0            
1113 0           $self->app_handle->log_fatal(@_);
1114             } else {
1115 0           die join('',@_) ."\n";
1116             }
1117             }
1118              
1119             =head2 changeset_creator
1120              
1121             The string to use as the creator of a changeset.
1122              
1123             =cut
1124              
1125             sub changeset_creator {
1126 0     0 1   my $self = shift;
1127 0           return $self->app_handle->current_user_email;
1128             }
1129              
1130             =head2 display_name_for_replica [uuid]
1131              
1132             If the user has a "friendly" name for this replica, then use it. Otherwise,
1133             display the replica's uuid.
1134              
1135             If you pass in a uuid, it will be used instead of the replica's uuid.
1136              
1137             =cut
1138              
1139             sub display_name_for_replica {
1140 0     0 1   my $self = shift;
1141 0   0       my $uuid = shift || $self->uuid;
1142              
1143 0 0         return $uuid if !$self->app_handle;
1144              
1145 0           return $self->app_handle->display_name_for_replica($uuid);
1146             }
1147              
1148             __PACKAGE__->meta->make_immutable();
1149 40     40   372 no Any::Moose;
  40         69  
  40         265  
1150              
1151             1;