File Coverage

blib/lib/KiokuDB/Backend/CouchDB.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             package KiokuDB::Backend::CouchDB;
4 1     1   34512 use Moose;
  0            
  0            
5             use Moose::Util::TypeConstraints;
6             use Data::Stream::Bulk::Util qw(bulk);
7              
8             use AnyEvent::CouchDB;
9             use Carp 'confess';
10             use Try::Tiny;
11             use List::MoreUtils qw{ any };
12             use Time::HiRes qw/gettimeofday tv_interval/;
13              
14             use KiokuDB::Backend::CouchDB::Exceptions;
15              
16             use namespace::clean -except => 'meta';
17              
18             our $VERSION = '0.16';
19              
20             # TODO Read revision numbers into rev field and use for later conflict resolution
21              
22             with qw(
23             KiokuDB::Backend
24             KiokuDB::Backend::Serialize::JSPON
25             KiokuDB::Backend::Role::UnicodeSafe
26             KiokuDB::Backend::Role::Clear
27             KiokuDB::Backend::Role::Scan
28             KiokuDB::Backend::Role::Query::Simple::Linear
29             KiokuDB::Backend::Role::TXN::Memory
30             KiokuDB::Backend::Role::Concurrency::POSIX
31             );
32              
33             # TODO Remove TXN::Memory or ensure that it works as it should
34              
35             has create => (
36             isa => "Bool",
37             is => "ro",
38             default => 0,
39             );
40              
41             has conflicts => (
42             is => 'rw',
43             isa => enum([qw{ overwrite confess ignore throw }]),
44             default => 'throw'
45             );
46            
47              
48             sub BUILD {
49             my $self = shift;
50              
51             if ( $self->create ) {
52             my $e = do {local $@; eval { $self->db->create->recv }; $@ };
53              
54             # Throw errors except if its because the database already exists
55             if ( $e ) {
56             if ( my($error) = grep { exists $_->{error} } @$e ) {
57             if( $error->{error} ne 'file_exists' ) {
58             die "$error->{error}: $error->{reason}";
59             }
60             }
61             }
62             }
63             }
64              
65             has db => (
66             isa => "AnyEvent::CouchDB::Database",
67             is => "ro",
68             handles => [qw(document)],
69             );
70              
71             has '+id_field' => ( default => "_id" );
72             has '+class_field' => ( default => "class" );
73             has '+class_meta_field' => ( default => "class_meta" );
74             has '+deleted_field' => ( default => "_deleted" );
75              
76             our @couch_meta_fields = qw{ _rev _attachments _conflicts };
77              
78              
79             sub delete {
80             my ( $self, @ids_or_entries ) = @_;
81            
82             my $db = $self->db;
83            
84             warn "Remove: ", join(', ', @ids_or_entries);
85            
86             for(@ids_or_entries) {
87             if(blessed($_)) {
88             my $meta = $self->find_meta($_);
89             $db->remove_doc({
90             _id => $_->id,
91             ($meta->{_rev} ? (_rev => $meta->{_rev}) : ())
92             });
93             } else {
94             $db->remove_doc({_id => $_});
95             }
96             }
97            
98             return;
99             }
100              
101             sub new_from_dsn_params {
102             my ( $self, %args ) = @_;
103              
104             my $db = exists $args{db}
105             ? couch($args{uri})->db($args{db})
106             : couchdb($args{uri});
107            
108             $self->new(%args, db => $db);
109             }
110              
111             # Collect metadata for a given entry
112             sub find_meta {
113             my ( $self, $entry ) = @_;
114             my $meta;
115              
116             my $prev = $entry;
117             # Go backwards in history to collect metadata
118             # TODO Consider whether this should be necessary - why not store this in every entry?
119             while($prev and any {not exists $meta->{$_}} @couch_meta_fields) {
120             if(my $backend_data = $prev->backend_data) {
121             for(@couch_meta_fields) {
122             $meta->{$_} = $backend_data->{$_}
123             if $backend_data->{$_} and not exists $meta->{$_};
124             }
125             }
126             $prev = $prev->prev;
127             }
128            
129             return $meta;
130             }
131              
132             sub commit_entries {
133             my ( $self, @entries ) = @_;
134            
135             my @docs;
136             my $db = $self->db;
137            
138             my $start = [ gettimeofday ];
139              
140             foreach my $entry ( @entries ) {
141            
142             my $meta = $self->find_meta($entry);
143            
144             my $collapsed = $self->collapse_jspon($entry);
145              
146             for(@couch_meta_fields) {
147             $collapsed->{$_} = $meta->{$_}
148             if $meta->{$_}
149             }
150            
151             push @docs, $collapsed;
152              
153             $entry->backend_data($collapsed);
154              
155             }
156              
157             # TODO couchdb <= 0.8 (possibly 0.9 too) will return a hash ref here, which will fail. Detect and handle.
158             my $data = $self->db->bulk_docs(\@docs)->recv;
159              
160             if ( my @errors = grep { exists $_->{error} } @$data ) {
161              
162             if($self->conflicts eq 'confess') {
163             no warnings 'uninitialized';
164             confess "Errors in update: " . join(", ", map { "$_->{error} (on ID $_->{id} ($_->{rev}, $_->{error}, $_->{reason}))" } @errors);
165             } elsif($self->conflicts eq 'overwrite' or $self->conflicts eq 'throw') {
166             my %conflicts;
167             my @conflicts;
168             my @other_errors;
169             for(@errors) {
170             if($_->{error} eq 'conflict') {
171             push @conflicts, $_->{id};
172             } else {
173             push @other_errors, $_;
174             }
175             }
176             if(@other_errors) {
177             confess "Errors in update: " . join(", ", map { "$_->{error} (on ID $_->{id} ($_->{rev}))" } @other_errors);
178             }
179            
180             # Updating resulted in conflicts that we handle by overwriting the change
181             my $old_docs = $db->open_docs([@conflicts], {conflicts => 'true'})->recv;
182             if(exists $old_docs->{error}) {
183             confess "Updating ids ", join(', ', @conflicts), " failed during conflict resolution: $old_docs->{error}.";
184             }
185             my @old_docs = @{$old_docs->{rows}};
186              
187             if($self->conflicts eq 'overwrite') {
188             my @re_update_docs;
189             foreach my $old_doc (@old_docs) {
190             my($new_doc) = grep {$old_doc->{doc}{_id} eq $_->{_id}} @docs;
191             $new_doc->{_rev} = $old_doc->{doc}{_rev};
192             push @re_update_docs, $new_doc;
193             }
194             # Handle errors that has arised when trying the second update
195             if(@errors = grep { exists $_->{error} } @{$self->db->bulk_docs(\@re_update_docs)->recv}) {
196             confess "Updating ids ", join(', ', @conflicts), " failed during conflict resolution: ",
197             join(', ', map { $_->{error} . ' on ' . $_->{id} } @errors);
198             }
199             } else { # throw
200             my $conflicts = [];
201             my %docs;
202             for(@docs) {
203             $docs{$_->{_id}} = $_;
204             }
205             for(my $i=0; $i < @conflicts; $i++) {
206             push @$conflicts, {
207             new => $docs{$conflicts[$i]}->{data},
208             old => $old_docs[$i]->{doc}{data}
209             };
210             }
211             KiokuDB::Backend::CouchDB::Exception::Conflicts->throw(
212             conflicts => $conflicts,
213             error => 'Conflict while storing objects'
214             );
215             }
216             }
217             # $self->conflicts eq 'ignore' here, so don't do anything
218             }
219              
220             foreach my $rev ( map { $_->{rev} } @$data ) {
221             ( shift @docs )->{_rev} = $rev;
222             }
223              
224             if ($ENV{KIOKU_COUCH_TRACE}){
225             my $end = [ gettimeofday ];
226             warn "[KIOKU COUCH TRACE] KiokuDB::Backend::CouchDB::commit_entries() [", tv_interval($start, $end),"s]:\n";
227             warn "[KIOKU COUCH TRACE] ".$_->id.', ['.($_->class || '')."]\n" for @entries;
228             }
229             }
230              
231             sub get_from_storage {
232             my ( $self, @ids ) = @_;
233              
234             my @result;
235              
236             my $error_count = 0;
237             my $max_errors = 2;
238             my $retry_delay = 5;
239             my $data;
240             my $error;
241             my $start = [ gettimeofday ];
242             while(not $data and $error_count <= $max_errors) {
243             $error = undef;
244             try { $data = $self->db->open_docs(\@ids)->recv }
245             catch { $error_count++; $error = $_ };
246            
247             # Always retry immediately after first failed connect, then apply delay
248             sleep $retry_delay if $error_count > 1;
249            
250             if(not $error and not $data) {
251             die "Call to CouchDB returned false ($data)";
252             }
253             }
254             die $error->[0]{Reason} if ref $error eq 'ARRAY' and ref $error->[0] eq 'HASH' and $error->[0]{Reason};
255             die $error if $error;
256              
257             die('Invalid response from CouchDB (rows missing or not array)', $data)
258             unless $data->{rows} and ref $data->{rows} eq 'ARRAY';
259              
260             my @deleted;
261             my @not_found;
262             my @unknown;
263             my @errors;
264             my @docs;
265             for(@{ $data->{rows} }) {
266             if($_->{doc} ) {
267             # TODO We may have to check if $_->{doc} has a valid value and treat as error otherwise
268             push @docs, $_->{doc};
269             } elsif($_->{value}{deleted}) {
270             push @deleted, $_;
271             } elsif(my $error = $_->{error}) {
272             if($error eq 'not_found') {
273             push @not_found, $_;
274             } else {
275             push @errors, $_;
276             }
277             } else {
278             push @unknown, $_;
279             }
280             }
281             if(@errors) {
282             use Data::Dump 'pp';
283             die 'Error on fetch from CouchDB.', pp @errors;
284             }
285             if(@unknown) {
286             use Data::Dump 'pp';
287             die 'Unknown response from CouchDB.', pp @unknown;
288             }
289              
290             # TODO What to do with deleted entries?
291             # TODO What to do with entries not found?
292            
293             if ($ENV{KIOKU_COUCH_TRACE}){
294             my $end = [ gettimeofday ];
295             warn "[KIOKU COUCH TRACE] KiokuDB::Backend::CouchDB::get_from_storage() [", tv_interval($start, $end),"s]:\n";
296             warn "[KIOKU COUCH TRACE] ".$_->{_id}.', ['.($_->{class} || '')."]\n" for @docs;
297             warn "[KIOKU COUCH TRACE] (not found) ".$_->{key}."\n" for @not_found;
298             }
299            
300             return map { $self->deserialize($_) } @docs;
301             }
302              
303             sub deserialize {
304             my ( $self, $doc ) = @_;
305              
306             my %doc = %{ $doc };
307              
308             return $self->expand_jspon(\%doc, backend_data => $doc );
309             }
310              
311             sub clear {
312             my $self = shift;
313              
314             # FIXME TXN
315              
316             $self->db->drop->recv;
317             $self->db->create->recv;
318             }
319              
320             sub all_entries {
321             my ( $self, %args ) = @_;
322              
323             # FIXME pagination
324             my @ids = map { $_->{id} } @{ $self->db->all_docs->recv->{rows} };
325              
326             if ( my $l = $args{live_objects} ) {
327             my %entries;
328             @entries{@ids} = $l->ids_to_entries(@ids);
329              
330             my @missing = grep { not $entries{$_} } @ids;
331              
332             @entries{@missing} = $self->get(@missing);
333              
334             return bulk(values %entries);
335             } else {
336             return bulk($self->get(@ids));
337             }
338             }
339              
340             __PACKAGE__->meta->make_immutable;
341              
342             1;
343              
344             __END__
345              
346             =pod
347              
348             =head1 NAME
349              
350             KiokuDB::Backend::CouchDB - CouchDB backend for L<KiokuDB>
351              
352             =head1 SYNOPSIS
353              
354             KiokuDB->connect( "couchdb:uri=http://127.0.0.1:5984/database" );
355              
356             =head1 DESCRIPTION
357              
358             This backend provides L<KiokuDB> support for CouchDB using L<AnyEvent::CouchDB>.
359              
360             =head1 DEBUGGING
361              
362             Set the environment variable KIOKU_COUCH_TRACE if you want debug output
363             describing what CouchDB bound requests are being processed.
364              
365             =head1 TRANSACTION SUPPORT
366              
367             This backend does not currently support transactions.
368              
369             =head1 ATTRIBUTES
370              
371             =over 4
372              
373             =item db
374              
375             An L<AnyEvent::CouchDB::Database> instance.
376              
377             Required.
378              
379             =item create
380              
381             Whether or not to try and create the database on instantiaton.
382              
383             Defaults to false.
384              
385             =back
386              
387             =head1 SEE ALSO
388              
389             L<KiokuX::CouchDB::Role::View>.
390              
391             =head1 VERSION CONTROL
392              
393             L<http://github.com/mzedeler/kiokudb-backend-couchdb>
394              
395             =head1 AUTHOR
396              
397             Yuval Kogman E<lt>nothingmuch@woobling.orgE<gt>
398              
399             =head1 CONTRIBUTORS
400              
401             Michael Zedeler E<lt>michael@zedeler.dk<gt>, Anders Bruun Borch E<lt>cyborch@deck.dk<gt>,
402             Martin Parm E<lt>parmus@parmus.dk<gt>.
403              
404             =head1 COPYRIGHT
405              
406             Copyright (c) 2008, 2009 Yuval Kogman, Infinity Interactive. All
407             rights reserved This program is free software; you can redistribute
408             it and/or modify it under the same terms as Perl itself.
409              
410             Copyright (c) 2010 Leasingbørsen. All rights reserved. This program
411             is free software; you can redistribute it and/or modify it under
412             the same terms as Perl itself.
413              
414             =cut