File Coverage

blib/lib/Catmandu/Importer/RDF.pm
Criterion Covered Total %
statement 149 166 89.7
branch 58 78 74.3
condition 12 27 44.4
subroutine 29 29 100.0
pod 0 3 0.0
total 248 303 81.8


line stmt bran cond sub pod time code
1             package Catmandu::Importer::RDF;
2              
3 15     15   2089626 use open ':std', ':encoding(utf8)';
  15         9591  
  15         94  
4 15     15   128702 use namespace::clean;
  15         44  
  15         142  
5 15     15   2879 use Catmandu::Sane;
  15         30  
  15         117  
6 15     15   3097 use Moo;
  15         30  
  15         106  
7 15     15   19332 use RDF::Trine::Parser;
  15         10261113  
  15         446  
8 15     15   108 use RDF::Trine::Model;
  15         108  
  15         291  
9 15     15   82 use RDF::Trine::Store::SPARQL;
  15         28  
  15         328  
10 15     15   4583 use RDF::Trine::Store::LDF;
  15         9419445  
  15         570  
11 15     15   130 use RDF::Trine::Store;
  15         36  
  15         299  
12 15     15   114 use RDF::Query;
  15         45  
  15         358  
13 15     15   74 use RDF::LDF;
  15         34  
  15         250  
14 15     15   4890 use RDF::aREF;
  15         109881  
  15         985  
15 15     15   109 use RDF::aREF::Encoder;
  15         37  
  15         256  
16 15     15   73 use RDF::NS;
  15         41  
  15         227  
17 15     15   4372 use IO::Pipe;
  15         14272  
  15         427  
18 15     15   93 use JSON;
  15         32  
  15         156  
19 15     15   5382 use LWP::UserAgent::CHICaching;
  15         1531050  
  15         30078  
20              
21             our $VERSION = '0.32';
22              
23             with 'Catmandu::RDF';
24             with 'Catmandu::Importer';
25              
26             has url => (
27             is => 'ro'
28             );
29              
30             has base => (
31             is => 'ro',
32             lazy => 1,
33             builder => sub {
34 8 50   8   541 defined $_[0]->file ? "file://".$_[0]->file : "http://example.org/";
35             }
36             );
37              
38             has encoder => (
39             is => 'ro',
40             lazy => 1,
41             builder => sub {
42 30     30   920 my $ns = $_[0]->ns;
43 30 100 50     1749 RDF::aREF::Encoder->new(
44             ns => (($ns // 1) ? $ns : { }),
45             subject_map => !$_[0]->predicate_map,
46             );
47             }
48             );
49              
50             has sparql => (
51             is => 'ro',
52             lazy => 1,
53             trigger => sub {
54             my ($sparql, $ns) = ($_[1], $_[0]->ns);
55             $sparql = do { local (@ARGV,$/) = $sparql; <> } if $sparql =~ /^\S+$/ && -r $sparql;
56             my %prefix;
57             # guess requires prefixes (don't override existing). Don't mind false positives
58             $prefix{$_} = 1 for ($sparql =~ /\s([a-z][a-z0-0_-]*):/mig);
59             delete $prefix{$_} for ($sparql =~ /PREFIX\s+([^:]+):/mg);
60             $_[0]->{sparql} = join "\n", (map { $ns->SPARQL($_) } keys %prefix), $sparql;
61             }
62             );
63              
64             has sparql_result => (
65             is => 'ro',
66             default => sub { 'simple' }
67             );
68              
69             has predicate_map => (
70             is => 'ro',
71             );
72              
73             has triples => (
74             is => 'ro',
75             );
76              
77             has cache => (
78             is => 'ro',
79             default => sub { 0 }
80             );
81              
82             has cache_options => (
83             is => 'ro',
84             default => sub { +{
85             driver => 'Memory',
86             global => 1 ,
87             max_size => 1024*1024
88             } }
89             );
90              
91             has speed => (
92             is => 'ro',
93             );
94              
95             sub BUILD {
96 52     52 0 1040 my ($self) = @_;
97              
98 52 50       1179 if ($self->cache) {
99 0   0     0 my $options = $self->cache_options // {};
100 0         0 my $cache = CHI->new( %$options );
101 0         0 my $ua = LWP::UserAgent::CHICaching->new(cache => $cache);
102 0         0 RDF::Trine->default_useragent($ua);
103             }
104             }
105              
106             sub generator {
107             my ($self) = @_;
108              
109             if ($self->sparql) {
110             return $self->sparql_generator;
111             } else {
112             return $self->rdf_generator;
113             }
114             }
115              
116             sub sparql_generator {
117 3     3 0 10 my ($self) = @_;
118              
119 3 50       22 warn "--triples not active for sparql queries" if ($self->triples);
120 3 50       11 warn "--predicate_map not active for sparql queries" if ($self->predicate_map);
121              
122 3         47 my $encoder = RDF::aREF::Encoder->new( ns => {} ); # never return qnames
123              
124             sub {
125 3     3   46 state $stream = $self->_sparql_stream;
126 3 50 33     2713 if (defined($stream) && defined(my $row = $stream->next)) {
127 3 50 66     703709 if (ref $row eq 'RDF::Query::VariableBindings' || ref $row eq 'RDF::Trine::VariableBindings') {
128 3         11 my $ref = {};
129 3         50 for (keys %$row) {
130 4         58 my $val = $row->{$_};
131             $ref->{$_} = $self->sparql_result eq 'aref'
132 4 50       33 ? $encoder->object($val) : do { # TODO: clean up
133 4 100       38 if ( $val->is_resource ) {
    50          
134 2         26 $val->uri_value;
135             } elsif ( $val->is_literal) {
136 2         56 $val->literal_value;
137             } else {
138 0         0 $val->as_string
139             }
140             };
141             }
142 3         91 return $ref;
143             } else {
144 0         0 die "Expected a RDF::Query::VariableBindings or RDF::Trine::VariableBindings but got a " . ref($row);
145             }
146             } else {
147 0         0 return ($stream = undef);
148             }
149 3         271 };
150             }
151              
152             sub rdf_generator {
153 41     41 0 147 my ($self) = @_;
154             sub {
155 49     49   679 state $stream = $self->_hashref_stream;
156 38 50       415 return unless $stream;
157              
158 38         352 my $aref = { };
159              
160 38 100       436 if ($self->triples) {
161 10 100       80 if (my $hashref = $stream->()) {
162 8         350 $self->encoder->add_hashref($hashref, $aref);
163             }
164             else {
165 2         194 return ($stream = undef);
166             }
167             }
168             else {
169             # TODO: include namespace mappings if requested
170 28         304 while (my $hashref = $stream->()) {
171 103         4265 $self->encoder->add_hashref(
172             $hashref,
173             $aref
174             );
175             }
176              
177 28 100       574 if ($self->url) {
178 6         150 $aref->{_url} = $self->url;
179             }
180              
181 28         2031 $stream = undef;
182             }
183              
184 36 100       19294 if ($self->url) {
185             # RDF::Trine::Parser parses data from URL to UTF-8
186             # but we want internal character sequences
187 6         61 _utf8_decode($aref);
188             }
189              
190 36         1206 return $aref;
191 41         491 };
192             }
193              
194             sub _utf8_decode {
195 18 50   18   108 if (ref $_[0] eq 'HASH') {
196             # FIXME: UTF-8 in property values
197 18         40 foreach (values %{$_[0]}) {
  18         140  
198 42 100       237 ref($_) ? _utf8_decode($_) : utf8::decode($_);
199             }
200             } else {
201 0         0 foreach (@{$_[0]}) {
  0         0  
202 0 0       0 ref($_) ? _utf8_decode($_) : utf8::decode($_);
203             }
204             }
205             }
206              
207             sub _sparql_stream {
208 3     3   10 my ($self) = @_;
209              
210 3 50       15 die "need an url" unless $self->url;
211              
212 3         68 $self->log->info("parsing: " . $self->sparql);
213              
214 3         343 my $store;
215              
216             # Check if this server is an LDF server
217 3         61 my $ldf_client = RDF::LDF->new(url => $self->url);
218              
219 3 100       1366 if ($ldf_client->is_fragment_server) {
220 2         604658 $store = RDF::Trine::Store->new_with_config({
221             storetype => 'LDF',
222             url => $self->url
223             });
224             }
225             else {
226 1         8942 $store = RDF::Trine::Store->new_with_config({
227             storetype => 'SPARQL',
228             url => $self->url
229             });
230             }
231              
232 3 50       598836 unless ($store) {
233 0         0 $self->log->error("failed to connect to " . $self->url);
234 0         0 return;
235             }
236              
237 3         38 my $model = RDF::Trine::Model->new($store);
238              
239 3         127 my $rdf_query = RDF::Query->new($self->sparql);
240              
241 3 50       16588 unless ($rdf_query) {
242 0         0 $self->log->error("failed to parse " . $self->sparql);
243 0         0 return;
244             }
245              
246 3         22 my $iterator = $rdf_query->execute($model);
247              
248 3 50       44406 unless ($iterator) {
249 0         0 $self->log->error("failed to execute " . $self->sparql . " at " . $self->url);
250 0         0 return;
251             }
252             }
253              
254             sub _hashref_stream {
255 41     41   112 my ($self) = @_;
256              
257             # Create a pipe stream to convert a callback handler into an iterator
258 41         445 my $pipe = IO::Pipe->new();
259              
260 41 100       143716 if (my $pid = fork()) {
261             # parent
262 30         1802 $pipe->reader();
263              
264 30         7396 binmode($pipe,':encoding(UTF-8)');
265              
266             return sub {
267 141     141   1577066 state $line = <$pipe>;
268              
269 141 100       4427 return decode_json($line) if defined($line);
270              
271 30         22072373 waitpid($pid,0);
272              
273 30         1045 return undef;
274 30         11196 };
275             }
276             else {
277             # child
278 11         1322 $pipe->writer();
279              
280 11         2864 binmode($pipe,':encoding(UTF-8)');
281              
282 11 100       4495 my $parser = $self->type
283             ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
284              
285             my $handler = sub {
286 41     41   366562 my $triple = shift;
287 41         162 state $start = time;
288 41         130 state $count = 0;
289              
290 41 100       361 my $subject = $triple->subject->is_blank ?
291             '_:' . $triple->subject->blank_identifier :
292             $triple->subject->uri_value;
293 41 50       2455 my $predicate = $triple->predicate->is_blank ?
294             '_:' . $triple->predicate->blank_identifier :
295             $triple->predicate->value;
296 41 100       1428 my $value = $triple->object->is_literal ?
    100          
297             $triple->object->literal_value :
298             $triple->object->is_blank ?
299             '_:' . $triple->object->blank_identifier :
300             $triple->object->uri_value;
301 41         2506 my $type = lc $triple->object->type;
302 41 100       824 $type = 'bnode' if $type eq 'blank';
303 41 100       200 my $lang = $triple->object->is_literal ? $triple->object->literal_value_language : undef;
304 41 100       1055 my $datatype = $triple->object->is_literal ? $triple->object->literal_datatype : undef;
305              
306             # Create the RDF::Trine type RDF/JSON RDF::aREF can parse
307 41         1033 my $hashref = {};
308              
309 41         531 $hashref->{$subject}->{$predicate}->[0]->{type} = $type;
310 41 100       328 $hashref->{$subject}->{$predicate}->[0]->{datatype} = $datatype if $datatype;
311 41 100       254 $hashref->{$subject}->{$predicate}->[0]->{lang} = $lang if $lang;
312 41         249 $hashref->{$subject}->{$predicate}->[0]->{value} = $value;
313              
314 41         1182 print $pipe encode_json($hashref) , "\n";
315              
316 41         158 $count++;
317              
318 41 0 33     725 if ($self->speed && ($count % 100 == 0) && (my $elapsed = time - $start) ) {
      33        
319 0         0 printf STDERR "triples %9d (%d/sec)\n" , $count , $count/$elapsed;
320             }
321 11         26719 };
322              
323 11 100       321 if ($self->url) {
324 3         170 $parser->parse_url( $self->url, $handler);
325             }
326             else {
327 8   50     1683 my $from_scalar = (ref $self->file // '') eq 'SCALAR';
328              
329 8 50 66     1883 if (!$self->type and $self->file and !$from_scalar) {
      66        
330 6         448 $parser = $parser->guess_parser_by_filename($self->file)->new;
331             }
332              
333 8 100       27145 if ($from_scalar) {
334 2         131 $parser->parse( $self->base, ${$self->file}, $handler );
  2         278  
335             }
336             else {
337 6   33     323 $parser->parse_file( $self->base, $self->file // $self->fh, $handler );
338             }
339             }
340              
341 11         15554 exit(0);
342             }
343             }
344              
345              
346             1;
347              
348             __END__
349              
350             =head1 NAME
351              
352             Catmandu::Importer::RDF - parse RDF data
353              
354             =head1 SYNOPSIS
355              
356             Command line client C<catmandu>:
357              
358             catmandu convert RDF --url http://d-nb.info/gnd/4151473-7 to YAML
359              
360             catmandu convert RDF --file rdfdump.ttl to JSON
361              
362             # Parse the input into on JSON document per triplet. This is the
363             # most memory efficient (and fastest) way to parse RDF input.
364             catmandu convert RDF --triples 1 --file rdfdump.ttl to JSON
365              
366             # Transform back into NTriples (conversions to and from triples is the
367             # most efficient way to process RDF)
368             catmandu convert RDF --triples 1 --file rdfdump.ttl to RDF --type NTriples
369              
370             # Query a SPARQL endpoint
371             catmandu convert RDF --url http://dbpedia.org/sparql
372             --sparql "SELECT ?film WHERE { ?film dct:subject <http://dbpedia.org/resource/Category:French_films> }"
373              
374             catmandu convert RDF --url http://example.org/sparql --sparql query.rq
375              
376             # Query a Linked Data Fragment endpoint
377             catmandu convert RDF --url http://fragments.dbpedia.org/2014/en
378             --sparql "SELECT ?film WHERE { ?film dct:subject <http://dbpedia.org/resource/Category:French_films> }"
379              
380             In Perl code:
381              
382             use Catmandu::Importer::RDF;
383             my $url = "http://dx.doi.org/10.2474/trol.7.147";
384             my $rdf = Catmandu::Importer::RDF->new( url => $url )->first;
385              
386             =head1 DESCRIPTION
387              
388             This L<Catmandu::Importer> can be use to import RDF data from URLs, files or
389             input streams, SPARQL endpoints, and Linked Data Fragment endpoints.
390              
391             By default an RDF graph is imported as single item in aREF format (see
392             L<RDF::aREF>).
393              
394             =head1 CONFIGURATION
395              
396             =over
397              
398             =item url
399              
400             URL to retrieve RDF from.
401              
402             =item type
403              
404             RDF serialization type (e.g. C<ttl> for RDF/Turtle).
405              
406             =item base
407              
408             Base URL. By default derived from the URL or file name.
409              
410             =item ns
411              
412             Use default namespace prefixes as provided by L<RDF::NS> to abbreviate
413             predicate and datatype URIs. Set to C<0> to disable abbreviating URIs.
414             Set to a specific date to get stable namespace prefix mappings.
415              
416             =item triples
417              
418             Import each RDF triple as one aREF subject map (default) or predicate map
419             (option C<predicate_map>), if enabled. This is the most efficient way to
420             process large input files. All the processing can be streamed.
421              
422             =item predicate_map
423              
424             Import RDF as aREF predicate map, if possible.
425              
426             =item file
427              
428             =item fh
429              
430             =item encoding
431              
432             =item fix
433              
434             Default configuration options of L<Catmandu::Importer>.
435              
436             =item sparql
437              
438             The SPARQL query to be executed on the URL endpoint (currectly only SELECT is
439             supported). The query can be supplied as string or as filename. The importer
440             tries to automatically add missing PREFIX statements from the default namespace
441             prefixes.
442              
443             =item sparql_result
444              
445             Encoding of SPARQL result values. With C<aref>, query results are encoded in
446             aREF format, with URIs in C<E<lt>> and C<E<gt>> (no qNames) and literal nodes
447             appended by C<@> and optional language code. By default (value C<simple>), all
448             RDF nodes are simplfied to their literal form.
449              
450             =item cache
451              
452             Set to a true value to cache repeated URL responses in a L<CHI> based backend.
453              
454             =item cache_options
455              
456             Provide the L<CHI> based options for caching result sets. By default a memory store of
457             1MB size is used. This is equal to:
458              
459             Catamandu::Importer::RDF->new( ...,
460             cache => 1,
461             cache_options => {
462             driver => 'Memory',
463             global => 1,
464             max_size => 1024*1024
465             });
466              
467             =item speed
468              
469             If set to a true value, then write RDF file processing speed on the STDERR as
470             number of triples parsed per second.
471              
472             =back
473              
474             =head1 METHODS
475              
476             See L<Catmandu::Importer>.
477              
478             =head1 SEE ALSO
479              
480             L<RDF::Trine::Store>, L<RDF::Trine::Parser>
481              
482             =encoding utf8
483              
484             =cut