File Coverage

blib/lib/Catmandu/Store/ElasticSearch/Bag.pm
Criterion Covered Total %
statement 15 59 25.4
branch 0 24 0.0
condition 0 11 0.0
subroutine 5 14 35.7
pod 0 7 0.0
total 20 115 17.3


line stmt bran cond sub pod time code
1             package Catmandu::Store::ElasticSearch::Bag;
2              
3 1     1   6 use Catmandu::Sane;
  1         3  
  1         11  
4 1     1   254 use Moo;
  1         2  
  1         6  
5 1     1   1713 use Catmandu::Hits;
  1         101876  
  1         42  
6 1     1   811 use Catmandu::Store::ElasticSearch::Searcher;
  1         3  
  1         38  
7 1     1   702 use Catmandu::Store::ElasticSearch::CQL;
  1         5  
  1         2362  
8              
9             with 'Catmandu::Bag';
10             with 'Catmandu::Searchable';
11             with 'Catmandu::Buffer';
12              
13             has cql_mapping => (is => 'ro'); # TODO move to Searchable
14             has on_error => (is => 'ro', default => sub { 'IGNORE'} );
15              
16             sub generator {
17 0     0 0   my ($self) = @_;
18 0           my $limit = $self->buffer_size;
19             sub {
20 0     0     state $scroller = $self->store->elastic_search->scrolled_search({
21             search_type => 'scan',
22             query => {match_all => {}},
23             type => $self->name,
24             });
25 0           state @hits;
26 0 0         @hits = $scroller->next($limit) unless @hits;
27 0   0       (shift(@hits) || return)->{_source};
28 0           };
29             }
30              
31             sub count {
32 0     0 0   my ($self) = @_;
33 0           $self->store->elastic_search->count(type => $self->name)->{count};
34             }
35              
36             sub get {
37             my ($self, $id) = @_;
38             my $res = $self->store->elastic_search->get(
39             type => $self->name,
40             ignore_missing => 1,
41             id => $id,
42             );
43             return $res->{_source} if $res;
44             return;
45             }
46              
47             sub add {
48             my ($self, $data) = @_;
49              
50             $self->buffer_add({index => {
51             type => $self->name,
52             id => $data->{_id},
53             data => $data,
54             }});
55              
56             if ($self->buffer_is_full) {
57             $self->commit;
58             }
59             }
60              
61             sub delete {
62             my ($self, $id) = @_;
63              
64             $self->buffer_add({delete => {
65             type => $self->name,
66             id => $id,
67             }});
68              
69             if ($self->buffer_is_full) {
70             $self->commit;
71             }
72             }
73              
74             sub delete_all {
75 0     0 0   my ($self) = @_;
76 0           my $es = $self->store->elastic_search;
77 0           $es->delete_by_query(
78             query => {match_all => {}},
79             type => $self->name,
80             );
81 0           $es->refresh_index;
82             }
83              
84             sub delete_by_query {
85             my ($self, %args) = @_;
86             my $es = $self->store->elastic_search;
87             $es->delete_by_query(
88             query => $args{query},
89             type => $self->name,
90             );
91             $es->refresh_index;
92             }
93              
94             sub commit { # TODO optimize
95 0     0 0   my ($self) = @_;
96 0 0         return 1 unless $self->buffer_used;
97 0           $self->store->elastic_search->bulk(actions => $self->buffer, refresh => 1, on_error => $self->on_error);
98 0           $self->clear_buffer;
99 0           return 1;
100             }
101              
102             sub search {
103             my ($self, %args) = @_;
104              
105             my $start = delete $args{start};
106             my $limit = delete $args{limit};
107             my $bag = delete $args{reify};
108              
109             if ($bag) {
110             $args{fields} = [];
111             }
112              
113             my $res = $self->store->elastic_search->search({
114             %args,
115             type => $self->name,
116             from => $start,
117             size => $limit,
118             });
119              
120             my $docs = $res->{hits}{hits};
121              
122             my $hits = {
123             start => $start,
124             limit => $limit,
125             total => $res->{hits}{total},
126             };
127              
128             if ($bag) {
129             $hits->{hits} = [ map { $bag->get($_->{_id}) } @$docs ];
130             } elsif ($args{fields}) {
131             $hits->{hits} = [ map { $_->{fields} || {} } @$docs ];
132             } else {
133             $hits->{hits} = [ map { $_->{_source} } @$docs ];
134             }
135              
136             $hits = Catmandu::Hits->new($hits);
137              
138             if ($args{facets}) {
139             $hits->{facets} = $res->{facets};
140             }
141              
142             if ($args{highlight}) {
143             for my $hit (@$docs) {
144             if (my $hl = $hit->{highlight}) {
145             $hits->{highlight}{$hit->{_id}} = $hl;
146             }
147             }
148             }
149              
150             $hits;
151             }
152              
153             sub searcher {
154             my ($self, %args) = @_;
155             Catmandu::Store::ElasticSearch::Searcher->new(%args, bag => $self);
156             }
157              
158             sub translate_sru_sortkeys {
159 0     0 0   my ($self, $sortkeys) = @_;
160 0           [ grep { defined $_ } map { $self->_translate_sru_sortkey($_) } split /\s+/, $sortkeys ];
  0            
  0            
161             }
162              
163             sub _translate_sru_sortkey {
164 0     0     my ($self, $sortkey) = @_;
165 0           my ($field, $schema, $asc) = split /,/, $sortkey;
166 0 0         $field || return;
167 0 0         if (my $map = $self->cql_mapping) {
168 0           $field = lc $field;
169 0 0         $field =~ s/(?<=[^_])_(?=[^_])//g if $map->{strip_separating_underscores};
170 0   0       $map = $map->{indexes} || return;
171 0   0       $map = $map->{$field} || return;
172 0 0         $map->{sort} || return;
173 0 0 0       if (ref $map->{sort} && $map->{sort}{field}) {
    0          
    0          
174 0           $field = $map->{sort}{field};
175             } elsif (ref $map->{field}) {
176 0           $field = $map->{field}->[0];
177             } elsif ($map->{field}) {
178 0           $field = $map->{field};
179             }
180             }
181 0   0       $asc //= 1;
182 0 0         +{ $field => $asc ? 'asc' : 'desc' };
183             }
184              
185             sub translate_cql_query {
186 0     0 0   my ($self, $query) = @_;
187 0           Catmandu::Store::ElasticSearch::CQL->new(mapping => $self->cql_mapping)->parse($query);
188             }
189              
190             sub normalize_query {
191 0     0 0   my ($self, $query) = @_;
192 0 0         if (ref $query) {
    0          
193 0           $query;
194             } elsif ($query) {
195 0           {query_string => {query => $query}};
196             } else {
197 0           {match_all => {}};
198             }
199             }
200              
201             =head1 SEE ALSO
202              
203             L, L
204              
205             =cut
206              
207             1;