File Coverage

blib/lib/RDF/Trine/Store/Redis.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             RDF::Trine::Store::Redis - RDF Store for Redis
4              
5             =head1 VERSION
6              
7             This document describes RDF::Trine::Store::Redis version 1.018
8              
9             =head1 SYNOPSIS
10              
11             use RDF::Trine::Store::Redis;
12              
13             =head1 DESCRIPTION
14              
15             RDF::Trine::Store::Redis provides a RDF::Trine::Store API to interact with a
16             Redis server.
17              
18             =cut
19              
20             package RDF::Trine::Store::Redis;
21              
22 1     1   425 use strict;
  1         2  
  1         24  
23 1     1   5 use warnings;
  1         2  
  1         22  
24 1     1   4 no warnings 'redefine';
  1         2  
  1         33  
25 1     1   5 use base qw(RDF::Trine::Store);
  1         2  
  1         55  
26              
27 1     1   273 use Redis;
  0            
  0            
28             use Cache::LRU;
29             use URI::Escape;
30             use Data::Dumper;
31             use Digest::MD5 qw(md5_base64);
32             use List::Util qw(first);
33             use List::MoreUtils qw(zip);
34             use Scalar::Util qw(refaddr reftype blessed);
35             use HTTP::Request::Common ();
36             use JSON;
37              
38             use RDF::Trine::Error qw(:try);
39              
40             ######################################################################
41              
42             our $CACHING = 1;
43              
44             my @pos_names;
45             our $VERSION;
46             BEGIN {
47             $VERSION = "1.018";
48             my $class = __PACKAGE__;
49             $RDF::Trine::Store::STORE_CLASSES{ $class } = $VERSION;
50             @pos_names = qw(subject predicate object context);
51             }
52              
53             ######################################################################
54              
55             =head1 METHODS
56              
57             Beyond the methods documented below, this class inherits methods from the
58             L<RDF::Trine::Store> class.
59              
60             =over 4
61              
62             =item C<< new ( $server ) >>
63              
64             Returns a new storage object.
65              
66             =item C<new_with_config ( $hashref )>
67              
68             Returns a new storage object configured with a hashref with certain
69             keys as arguments.
70              
71             The C<storetype> key must be C<Redis> for this backend.
72              
73             The following key must also be used:
74              
75             =over
76              
77             =item foo
78              
79             description
80              
81             =back
82              
83             =cut
84              
85             sub new {
86             my $class = shift;
87             my %args = @_;
88             my $size = delete $args{cache_size};
89             $size = 128 unless (defined($size) and $size > 0);
90             my $r = Redis->new( %args );
91             my $cache = Cache::LRU->new( size => $size );
92             my $self = bless({ conn => $r, cache => $cache, cache_size => $size }, $class);
93             return $self;
94             }
95              
96             =item C<< conn >>
97              
98             Returns the Redis connection object.
99              
100             =cut
101              
102             sub conn {
103             my $self = shift;
104             return $self->{conn};
105             }
106              
107             =item C<< cache >>
108              
109             Returns the Cache::LRU object used to cache frequently used redis data.
110              
111             =cut
112              
113             sub cache {
114             my $self = shift;
115             return $self->{cache};
116             }
117              
118             sub _new_with_string {
119             my $class = shift;
120             my $config = shift;
121             return $class->new( $config );
122             }
123              
124             =item C<< new_with_config ( \%config ) >>
125              
126             Returns a new RDF::Trine::Store object based on the supplied configuration hashref.
127              
128             =cut
129              
130             sub new_with_config {
131             my $proto = shift;
132             my $config = shift;
133             $config->{storetype} = 'Redis';
134             return $proto->SUPER::new_with_config( $config );
135             }
136              
137             sub _new_with_config {
138             my $class = shift;
139             my $config = shift;
140             return $class->new( server => $config->{server}, cache_size => $config->{cache_size} );
141             }
142              
143             sub _config_meta {
144             return {
145             required_keys => [qw(server)],
146             fields => {
147             server => { description => 'server:port', type => 'string' },
148             cache_size => { description => 'cache size', type => 'int' },
149             }
150             }
151             }
152              
153             sub _id_node {
154             my $self = shift;
155             my @id = @_;
156             my $r = $self->conn;
157             my $p = RDF::Trine::Parser::NTriples->new();
158            
159             my @nodes;
160             foreach my $id (@id) {
161             my $bucket = int($id / 1000);
162             my $hid = $id % 1000;
163             my $key = "R:n.v:$bucket";
164             my $nt = $r->hget($key, $hid);
165             my $node = $p->parse_node( $nt );
166             push(@nodes, $node);
167             }
168             return @nodes;
169             }
170              
171             sub _get_node_id {
172             my $self = shift;
173             my @node = @_;
174             my $r = $self->conn;
175             my $s = RDF::Trine::Serializer::NTriples->new();
176             my @str = map { $s->serialize_node( $_ ) } @node;
177             my @ids;
178             foreach my $nt (@str) {
179             my $md5 = md5_base64($nt);
180             my $key = "R:n.i:$md5";
181             my $id = $r->get($key);
182             push(@ids, $id);
183             }
184             return wantarray ? @ids : $ids[0];
185             }
186              
187             sub _get_or_set_node_id {
188             my $self = shift;
189             my $node = shift;
190             my $r = $self->conn;
191             my $s = RDF::Trine::Serializer::NTriples->new();
192             my $nt = $s->serialize_node( $node );
193            
194             my $md5 = md5_base64($nt);
195             my $idkey = "R:n.i:$md5";
196             my $id = $r->get( $idkey );
197             return $id if (defined($id));
198            
199             $id = $r->incr( 'RT:node.next' );
200            
201             $r->set($idkey, $id);
202            
203             my $bucket = int($id / 1000);
204             my $hid = $id % 1000;
205            
206             my $valkey = "R:n.v:$bucket";
207             $r->hset( $valkey, $hid, $nt );
208            
209             return $id;
210             }
211              
212             =item C<< add_statement ( $statement [, $context] ) >>
213              
214             Adds the specified C<$statement> to the underlying model.
215              
216             =cut
217              
218             sub add_statement {
219             my $self = shift;
220             my $st = shift;
221             my $context = shift;
222             unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
223             throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to add_statement";
224             }
225            
226             if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) {
227             throw RDF::Trine::Error::MethodInvocationError -text => "add_statement cannot be called with both a quad and a context";
228             }
229            
230             if ($self->_bulk_ops) {
231             push(@{ $self->{ ops } }, ['_add_statements', $st, $context]);
232             } else {
233             my $r = $self->conn;
234             my @nodes = $st->nodes;
235             $nodes[3] = $context if ($context);
236             @nodes = map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3];
237             my @ids = map { $self->_get_or_set_node_id($_) } @nodes;
238             my $key = join(':', @ids);
239             my @keys = qw(s p o g);
240             $r->hmset( "RT:spog:$key", zip @keys, @ids );
241             $r->sadd( "RT:sset:$ids[0]", $key );
242             $r->sadd( "RT:pset:$ids[1]", $key );
243             $r->sadd( "RT:oset:$ids[2]", $key );
244             $r->sadd( "RT:gset:$ids[3]", $key );
245             }
246             return;
247             }
248              
249             =item C<< remove_statement ( $statement [, $context]) >>
250              
251             Removes the specified C<$statement> from the underlying model.
252              
253             =cut
254              
255             sub remove_statement {
256             my $self = shift;
257             my $st = shift;
258             my $context = shift;
259            
260             unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
261             throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to remove_statement";
262             }
263            
264             if ($st->isa('RDF::Trine::Statement::Quad') and blessed($context)) {
265             throw RDF::Trine::Error::MethodInvocationError -text => "remove_statement cannot be called with both a quad and a context";
266             }
267            
268             if ($self->_bulk_ops) {
269             push(@{ $self->{ ops } }, ['_remove_statements', $st, $context]);
270             } else {
271             my $r = $self->conn;
272             my @nodes = $st->nodes;
273             $nodes[3] = $context if ($context);
274             @nodes = map { defined($_) ? $_ : RDF::Trine::Node::Nil->new } @nodes[0..3];
275             my @ids = $self->_get_node_id(@nodes);
276             foreach my $i (@ids) {
277             return unless defined($i);
278             }
279             my $key = join(':', @ids);
280             $r->del( "RT:spog:$key" );
281             $r->srem( "RT:sset:$ids[0]", $key );
282             $r->srem( "RT:pset:$ids[1]", $key );
283             $r->srem( "RT:oset:$ids[2]", $key );
284             $r->srem( "RT:gset:$ids[3]", $key );
285             }
286             return;
287             }
288              
289             =item C<< remove_statements ( $subject, $predicate, $object [, $context]) >>
290              
291             Removes the specified C<$statement> from the underlying model.
292              
293             =cut
294              
295             sub remove_statements {
296             my $self = shift;
297             my @nodes = @_[0..3];
298             my $st = RDF::Trine::Statement->new( @nodes[0..2] );
299             my $context = $nodes[3];
300            
301             if ($self->_bulk_ops) {
302             push(@{ $self->{ ops } }, ['_remove_statement_patterns', $st, $context]);
303             } else {
304             my @strs = map { (not(blessed($_)) or $_->is_variable) ? '*' : $self->_get_or_set_node_id($_) } @nodes;
305             my $key = 'RT:spog:' . join(':', @strs);
306             my $r = $self->conn;
307             foreach my $k ($r->keys($key)) {
308             my ($sid, $pid, $oid, $gid) = $k =~ m/RT:spog:(\d+):(\d+):(\d+):(\d+)/;
309             $r->srem( "RT:sset:$sid", $_ ) for ($r->smembers("RT:sset:$sid"));
310             $r->srem( "RT:pset:$pid", $_ ) for ($r->smembers("RT:pset:$pid"));
311             $r->srem( "RT:oset:$oid", $_ ) for ($r->smembers("RT:oset:$oid"));
312             $r->srem( "RT:gset:$gid", $_ ) for ($r->smembers("RT:gset:$gid"));
313             $r->del( $k );
314             }
315             }
316             return;
317             }
318              
319             =item C<< get_statements ($subject, $predicate, $object [, $context] ) >>
320              
321             Returns a stream object of all statements matching the specified subject,
322             predicate and objects. Any of the arguments may be undef to match any value.
323              
324             =cut
325              
326             sub get_statements {
327             my $self = shift;
328             my @nodes = @_;
329            
330             my $use_quad = 0;
331             if (scalar(@_) >= 4) {
332             $use_quad = 1;
333             } elsif (scalar(@nodes) != 3) {
334             $#nodes = 3;
335             $use_quad = 1;
336             }
337            
338             my @var_map = qw(s p o g);
339             my %var_map = map { $var_map[$_] => $_ } (0 .. $#var_map);
340             my @node_map;
341             foreach my $i (0 .. $#nodes) {
342             if (not(blessed($nodes[$i])) or $nodes[$i]->is_variable) {
343             $nodes[$i] = RDF::Trine::Node::Variable->new( $var_map[ $i ] );
344             }
345             }
346            
347             my $sub;
348             if ($use_quad) {
349             my $r = $self->conn;
350             my @skeys;
351             my @indexes = qw(s p o g);
352             foreach my $i (0 .. $#indexes) {
353             my $index = $indexes[$i];
354             my $n = $nodes[$i];
355             unless ($n->is_variable) {
356             my $id = $self->_get_node_id($n);
357             unless (defined($id)) {
358             return RDF::Trine::Iterator::Graph->new( [] );
359             }
360             my $key = "RT:${index}set:$id";
361             push(@skeys, $key);
362             }
363             }
364             if (@skeys) {
365             my @keys = $r->sinter(@skeys);
366             $sub = sub {
367             return unless (scalar(@keys));
368             my $key = shift(@keys);
369             my @data = split(':', $key);
370             my @nodes = $self->_id_node( @data[0..3] );
371             my $st = RDF::Trine::Statement::Quad->new( @nodes );
372             return $st;
373             };
374             } else {
375             my @strs = map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes;
376             my $key = 'RT:spog:' . join(':', @strs);
377             my @keys = $r->keys($key);
378             $sub = sub {
379             return unless (scalar(@keys));
380             my $key = shift(@keys);
381             (undef, undef, my @data) = split(':', $key);
382             my @nodes = $self->_id_node( @data );
383             my $st = RDF::Trine::Statement::Quad->new( @nodes );
384             return $st;
385             };
386             }
387             } else {
388             my $r = $self->conn;
389             my @skeys;
390             my @indexes = qw(s p o);
391             foreach my $i (0 .. $#indexes) {
392             my $index = $indexes[$i];
393             my $n = $nodes[$i];
394             unless ($n->is_variable) {
395             my $id = $self->_get_node_id($n);
396             unless (defined($id)) {
397             return RDF::Trine::Iterator::Graph->new( [] );
398             }
399             my $key = "RT:${index}set:$id";
400             push(@skeys, $key);
401             }
402             }
403             if (@skeys) {
404             my @keys = $r->sinter(@skeys);
405             my %keys;
406             foreach (@keys) {
407             s/:[^:]+$//;
408             $keys{ $_ }++;
409             }
410             @keys = keys %keys;
411             $sub = sub {
412             return unless (scalar(@keys));
413             my $key = shift(@keys);
414             my @data = split(':', $key);
415             my @nodes = $self->_id_node( @data[0..2] );
416             my $st = RDF::Trine::Statement->new( @nodes );
417             return $st;
418             };
419             } else {
420             my @strs = map { ($_->is_variable) ? '*' : $self->_get_node_id($_) } @nodes[0..2];
421             my $key = 'RT:spog:' . join(':', @strs, '*');
422             my %triples;
423             foreach ($r->keys($key)) {
424             s/:[^:]+$//;
425             $triples{ $_ }++;
426             }
427             my @keys = keys %triples;
428             $sub = sub {
429             return unless (scalar(@keys));
430             my $key = shift(@keys);
431             my ($ids) = $key =~ m/^RT:spog:(.*)$/;
432             my @data = split(':', $ids);
433             my @nodes = $self->_id_node( @data );
434             my $st = RDF::Trine::Statement->new( @nodes );
435             return $st;
436             };
437             }
438             }
439             return RDF::Trine::Iterator::Graph->new( $sub );
440             }
441              
442             =item C<< count_statements ( $subject, $predicate, $object, $context ) >>
443              
444             Returns a count of all the statements matching the specified subject,
445             predicate, object, and context. Any of the arguments may be undef to match any
446             value.
447              
448             =cut
449              
450             sub count_statements {
451             my $self = shift;
452             my $use_quad = 0;
453             if (scalar(@_) >= 4) {
454             $use_quad = 1;
455             # warn "count statements with quad" if ($::debug);
456             }
457             my @nodes = @_;
458             my @strs;
459             foreach my $n (@nodes[0..3]) {
460             if (not(blessed($n)) or $n->is_variable) {
461             push(@strs, '*');
462             } else {
463             my $id = $self->_get_node_id($n);
464             unless (defined($id)) {
465             return 0;
466             }
467             push(@strs, $id);
468             }
469             }
470              
471             if ($use_quad) {
472             my $key = 'RT:spog:' . join(':', @strs);
473             my $r = $self->conn;
474             my @keys = $r->keys($key);
475             return scalar(@keys);
476             } else {
477             my $key = 'RT:spog:' . join(':', @strs);
478             my $r = $self->conn;
479             my @keys = $r->keys($key);
480             my %keys;
481             foreach (@keys) {
482             s/:[^:]+$//;
483             $keys{ $_ }++;
484             }
485             @keys = keys %keys;
486             return scalar(@keys);
487             }
488             }
489              
490             =item C<< get_contexts >>
491              
492             Returns an RDF::Trine::Iterator over the RDF::Trine::Node objects comprising
493             the set of contexts of the stored quads.
494              
495             =cut
496              
497             sub get_contexts {
498             my $self = shift;
499             my $r = $self->conn;
500             my @keys = $r->keys('RT:spog:*');
501             my %graphs;
502             foreach (@keys) {
503             s/^.*://;
504             $graphs{ $_ }++;
505             }
506             my @nodes = grep { not($_->isa('RDF::Trine::Node::Nil')) } $self->_id_node(keys %graphs);
507             return RDF::Trine::Iterator->new( \@nodes );
508             }
509              
510              
511             =item C<< supports ( [ $feature ] ) >>
512              
513             If C<< $feature >> is specified, returns true if the feature is supported by the
514             store, false otherwise. If C<< $feature >> is not specified, returns a list of
515             supported features.
516              
517             =cut
518              
519             sub supports {
520             my $self = shift;
521             my %features = map { $_ => 1 } (
522             # 'http://www.w3.org/ns/sparql-service-description#SPARQL10Query',
523             # 'http://www.w3.org/ns/sparql-service-description#SPARQL11Query',
524             );
525             if (@_) {
526             my $f = shift;
527             return $features{ $f };
528             } else {
529             return keys %features;
530             }
531             }
532              
533             # =item C<< get_sparql ( $sparql ) >>
534             #
535             # Returns an iterator object of all bindings matching the specified SPARQL query.
536             #
537             # =cut
538             #
539             # sub get_sparql {
540             # my $self = shift;
541             # my $sparql = shift;
542             # throw RDF::Trine::Error::UnimplementedError -text => "get_sparql not implemented for Redis stores yet";
543             # }
544              
545             sub _bulk_ops {
546             return 0;
547             }
548              
549             sub _begin_bulk_ops {
550             return 0;
551             }
552              
553             sub _end_bulk_ops {
554             my $self = shift;
555             if (scalar(@{ $self->{ ops } || []})) {
556             my @ops = splice(@{ $self->{ ops } });
557             my @aggops = $self->_group_bulk_ops( @ops );
558             my @sparql;
559             warn '_end_bulk_ops: ' . Dumper(\@aggops);
560             throw RDF::Trine::Error::UnimplementedError -text => "bulk operations not implemented for Redis stores yet";
561             }
562             $self->{BulkOps} = 0;
563             }
564              
565             =item C<< nuke >>
566              
567             Permanently removes the store and its data.
568              
569             =cut
570              
571             sub nuke {
572             my $self = shift;
573             my $r = $self->conn;
574             $r->del('RT:node.next');
575             foreach my $k ($r->keys('R:n.i:*')) {
576             $r->del($k);
577             }
578             foreach my $k ($r->keys('R:n.v:*')) {
579             $r->del($k);
580             }
581             foreach my $k ($r->keys('RT:spog:*')) {
582             $r->del($k);
583             }
584             $r->del($_) foreach ($r->keys('RT:sset:*'));
585             $r->del($_) foreach ($r->keys('RT:pset:*'));
586             $r->del($_) foreach ($r->keys('RT:oset:*'));
587             $r->del($_) foreach ($r->keys('RT:gset:*'));
588            
589             $self->{cache} = Cache::LRU->new( size => $self->{cache_size} );
590             }
591              
592              
593             sub _dump {
594             my $self = shift;
595             my $r = $self->conn;
596             my @keys = $r->keys('RT:spog:*');
597             warn "--------------------------------------\n";
598             warn '*** DUMP Redis statements:';
599             warn "$_\n" foreach (@keys);
600             }
601              
602             1;
603              
604             __END__
605              
606             =back
607              
608             =head1 REDIS DATA LAYOUT
609              
610             ...
611              
612             =head1 BUGS
613              
614             Please report any bugs or feature requests to through the GitHub web interface
615             at L<https://github.com/kasei/perlrdf/issues>.
616              
617             =head1 AUTHOR
618              
619             Gregory Todd Williams C<< <gwilliams@cpan.org> >>
620              
621             =head1 COPYRIGHT
622              
623             Copyright (c) 2006-2012 Gregory Todd Williams. This
624             program is free software; you can redistribute it and/or modify it under
625             the same terms as Perl itself.
626              
627             =cut