File Coverage

blib/lib/Data/Queue/Persistent.pm
Criterion Covered Total %
statement 15 135 11.1
branch 0 64 0.0
condition 0 21 0.0
subroutine 5 24 20.8
pod 7 18 38.8
total 27 262 10.3


line stmt bran cond sub pod time code
1             package Data::Queue::Persistent;
2              
3 1     1   49339 use 5.008004;
  1         4  
  1         41  
4 1     1   5 use strict;
  1         2  
  1         46  
5 1     1   5 use warnings;
  1         6  
  1         30  
6 1     1   5 use Carp qw / croak /;
  1         1  
  1         76  
7 1     1   5915 use DBI;
  1         23579  
  1         2087  
8              
9             our $VERSION = '0.13';
10              
11             our $schema = q{
12             CREATE TABLE %s (
13             qkey VARCHAR(255) NOT NULL,
14             idx INTEGER UNSIGNED NOT NULL,
15             value BLOB,
16             PRIMARY KEY (qkey, idx)
17             )
18             };
19              
20             sub new {
21 0     0 1   my ($class, %opts) = @_;
22              
23 0           my $dsn = delete $opts{dsn};
24 0           my $dbh = delete $opts{dbh};
25              
26 0 0 0       croak "No DSN or database handle passed to Data::Queue::Persistent->new"
27             unless $dsn || $dbh;
28              
29 0           my $username = delete $opts{username};
30 0           my $pass = delete $opts{pass};
31              
32 0   0       my $cache = delete $opts{cache} || 0;
33              
34 0 0         my $key = delete $opts{id} or croak "No queue id defined";
35              
36 0   0       my $table = delete $opts{table} || 'persistent_queue';
37              
38 0           my $noload = delete $opts{noload};
39 0           my $max_size = delete $opts{max_size};
40              
41             # connect to db
42 0 0         if ($dsn) {
43 0 0         $dbh = DBI->connect($dsn, $username, $pass)
44             or croak "Could not connect to database";
45             }
46              
47 0           my $self = {
48             cache => $cache,
49             dbh => $dbh,
50             q => [],
51             key => $key,
52             table_name => $table,
53             max_size => $max_size,
54             };
55              
56 0           bless $self, $class;
57 0           $self->init;
58 0 0 0       $self->load if $self->caching && ! $noload;
59              
60 0           return $self;
61             }
62              
63             sub table_name {
64 0     0 0   my $self = shift;
65 0           return $self->dbh->quote_identifier($self->{table_name});
66             }
67              
68 0     0 0   sub dbh { $_[0]->{dbh} }
69 0     0 0   sub key { $_[0]->{key} }
70 0     0 0   sub q { $_[0]->{q} }
71 0     0 0   sub caching { $_[0]->{cache} }
72 0     0 0   sub max_size { $_[0]->{max_size} }
73              
74             # returns how many items are in the queue
75             sub length {
76 0     0 1   my $self = CORE::shift();
77              
78 0 0         return (scalar @{$self->{q}}) if $self->caching;
  0            
79              
80 0           my $table = $self->table_name;
81 0           my ($length) = $self->dbh->selectrow_array("SELECT COUNT(idx) FROM $table WHERE qkey=?",
82             undef, $self->key);
83 0 0         die $self->dbh->errstr if $self->dbh->err;
84              
85 0   0       return $length || 0;
86             }
87              
88             sub _max_idx {
89 0     0     my $self = CORE::shift();
90              
91             # TODO: cache max index
92              
93 0           my $table = $self->table_name;
94 0           my ($idx) = $self->dbh->selectrow_array("SELECT MAX(idx) FROM $table WHERE qkey=?",
95             undef, $self->key);
96 0 0         die $self->dbh->errstr if $self->dbh->err;
97              
98 0 0         return defined $idx ? $idx + 1 : 0;
99             }
100              
101             # do a sql statement and die if it fails
102             sub do {
103 0     0 0   my ($self, $sql, @vals) = @_;
104              
105 0           $self->dbh->do($sql, undef, @vals);
106 0 0         croak $self->dbh->errstr if $self->dbh->err;
107             }
108              
109             # initialize the storage
110             sub init {
111 0     0 0   my ($self) = @_;
112              
113 0 0         croak "No table name defined" unless $self->table_name;
114              
115             # don't do anything if table already exists
116 0 0         return if $self->table_exists;
117              
118             # table doesn't exist, create it
119 0           my $sql = sprintf($schema, $self->table_name);
120 0           $self->do($sql);
121             }
122              
123             # load data from db
124             sub load {
125 0     0 0   my $self = CORE::shift();
126              
127 0 0         my $table = $self->table_name or croak "No table name defined";
128 0 0         die "Table $table does not exist." unless $self->table_exists;
129              
130 0           my $rows = $self->dbh->selectall_arrayref("SELECT value FROM $table WHERE qkey=? ORDER BY idx", undef, $self->key);
131 0 0         die $self->dbh->errstr if $self->dbh->err;
132              
133 0 0 0       return unless $rows && @$rows;
134 0           $self->absorb_rows(@$rows);
135             }
136              
137             sub absorb_rows {
138 0     0 0   my ($self, @rows) = @_;
139 0           push @{$self->{q}}, map { $_->[0] } @rows;
  0            
  0            
140             }
141              
142             # delete everything from the queue
143             sub empty {
144 0     0 1   my ($self) = @_;
145              
146 0           my $table = $self->table_name;
147 0           $self->do("DELETE FROM $table WHERE qkey=?", $self->key);
148              
149 0 0         $self->{q} = [] if $self->caching;
150             }
151              
152             sub table_exists {
153 0     0 0   my $self = CORE::shift();
154             # get table info, see if our table exists
155 0           my @tables = $self->dbh->tables(undef, undef, $self->{table_name}, "TABLE");
156 0           my $table = $self->{table_name};
157              
158 0 0         $table = $self->dbh->quote_identifier($self->{table_name})
159             if $self->dbh->get_info(29); # quote if the db driver uses table name quoting
160              
161 0           return grep { $_ =~ m/$table$/ } @tables;
  0            
162             }
163              
164             # add @vals to the queue
165             *add = \&unshift;
166             sub unshift {
167 0     0 1   my ($self, @vals) = @_;
168              
169 0           my $idx = $self->_max_idx;
170              
171 0           my $key = $self->dbh->quote($self->key);
172 0           my $table = $self->table_name;
173 0           my $dbh = $self->dbh;
174              
175 0           $dbh->begin_work;
176 0           my $sth = $dbh->prepare(qq[ INSERT INTO $table (qkey, idx, value) VALUES ($key, ?, ?) ]);
177              
178 0           foreach my $val (@vals) {
179 0 0         push @{$self->{q}}, $val if $self->caching;
  0            
180              
181 0           $sth->execute($idx++, $val);
182 0 0         if ($dbh->err) {
183 0           die $dbh->errstr;
184 0           $dbh->rollback;
185             }
186             }
187              
188 0           $dbh->commit;
189              
190             # truncate queue to max_size
191 0           my $max_size = $self->max_size;
192 0           my $length = $self->length;
193 0 0 0       $self->shift($length - $max_size) if defined $max_size && $length > $max_size;
194             }
195              
196             # shift $count elements off the queue
197             *remove = \&shift;
198             sub shift {
199 0     0 1   my ($self, $_count) = @_;
200              
201 0 0         my $count = defined $_count ? $_count : 1;
202 0           $count += 0;
203              
204 0 0         if ($self->caching) {
205 0           CORE::shift(@{$self->{q}}) for 1 .. $count;
  0            
206             }
207              
208 0           my $table = $self->table_name;
209              
210             # begin transaction
211 0           $self->dbh->begin_work;
212              
213             # get $count elements
214 0           my $rows = $self->dbh->selectall_arrayref("SELECT idx, value FROM $table WHERE qkey = ? ORDER BY idx LIMIT $count",
215             undef, $self->key);
216 0 0         die $self->dbh->errstr if $self->dbh->err;
217              
218 0           my @idx = map { $_->[0] } @$rows;
  0            
219 0           my @vals = map { $_->[1] } @$rows;
  0            
220              
221 0 0         return () unless @vals;
222              
223             # remove the retreived elements
224 0           my $bindstr = join(',', map { '?' } @idx);
  0            
225 0           $self->do("DELETE FROM $table WHERE qkey=? AND idx BETWEEN ? AND ?", $self->key, $idx[0], $idx[-1]);
226              
227             # commit transaction
228 0           $self->dbh->commit;
229              
230             # return first element if no $count defined, otherwise return array of values
231 0 0         return $vals[0] unless defined $_count;
232 0           return @vals;
233             }
234              
235             # retreive elements at an index
236             sub get {
237 0     0 1   my ($self, $offset, $length, %opts) = @_;
238              
239 0 0 0       return $self->all unless defined $offset || defined $length;
240              
241 0 0         $length = -1 unless defined $length; # need to specify a limit when selecting an offset, this is wack
242 0           $offset += 0;
243              
244 0 0         my $direction = $opts{reverse} ? "DESC" : '';
245              
246 0           my $table = $self->table_name;
247 0           my $rows = $self->dbh->selectcol_arrayref("SELECT value FROM $table WHERE qkey = ? ORDER BY idx $direction LIMIT $length OFFSET $offset",
248             undef, $self->key);
249 0 0         die $self->dbh->errstr if $self->dbh->err;
250              
251 0 0         return wantarray ? @$rows : $rows->[0];
252             }
253              
254             # returns all elements of the queue
255             sub all {
256 0     0 1   my $self = CORE::shift();
257              
258 0 0         return @{$self->{q}} if $self->caching;
  0            
259              
260 0           my $valsref = $self->dbh->selectall_arrayref("SELECT value FROM " . $self->table_name . " WHERE qkey = ?",
261             undef, $self->key);
262 0           return map { @$_ } @$valsref;
  0            
263             }
264              
265             1;
266              
267             __END__