File Coverage

blib/lib/Queue/Q4Pg/Lite.pm
Criterion Covered Total %
statement 24 116 20.6
branch 0 26 0.0
condition 0 3 0.0
subroutine 8 19 42.1
pod 8 9 88.8
total 40 173 23.1


line stmt bran cond sub pod time code
1             package Queue::Q4Pg::Lite;
2              
3 2     2   2907 use strict;
  2         4  
  2         71  
4 2     2   10 use warnings;
  2         4  
  2         80  
5             our $VERSION = '0.03';
6              
7 2     2   11 use Carp ();
  2         4  
  2         38  
8 2     2   1839 use Any::Moose;
  2         98976  
  2         15  
9 2     2   12162 use DBI;
  2         58309  
  2         165  
10 2     2   3161 use SQL::Abstract;
  2         29935  
  2         307  
11              
12             has 'auto_reconnect' => (
13             is => 'rw',
14             isa => 'Bool',
15             required => 1,
16             default => 1,
17             );
18              
19             has 'connect_info' => (
20             is => 'rw',
21             isa => 'ArrayRef',
22             required => 1,
23             );
24              
25             has 'sql_maker' => (
26             is => 'rw',
27             isa => 'SQL::Abstract',
28             required => 1,
29             default => sub { SQL::Abstract->new }
30             );
31              
32             has '_dbh' => (
33             is => 'rw',
34             );
35              
36             has '_res' => (
37             is => 'rw',
38             );
39              
40             has 'interval' => (
41             is => 'rw',
42             isa => 'Int',
43             default => 5,
44             );
45              
46 2     2   28 use constant PG_ADVISORY_LOCK_SUPPORT_VERSION => 80200;
  2         5  
  2         200  
47              
48 2     2   11 no Any::Moose;
  2         6  
  2         21  
49              
50             sub connect {
51 0     0 1   my $self = shift;
52 0 0         if (! ref $self) {
53 0           $self = $self->new(@_);
54             }
55              
56 0 0         if (my $old = $self->_dbh()) {
57 0           $old->disconnect();
58             }
59              
60 0           my $dbh = $self->_connect();
61 0           $self->_dbh( $dbh );
62              
63 0           my $version = $dbh->{pg_server_version};
64 0 0         if ( $version < PG_ADVISORY_LOCK_SUPPORT_VERSION ) {
65 0 0         Carp::confess( "Connected database does not support pg_advisory_lock(). required PostgreSQL version (" . PG_ADVISORY_LOCK_SUPPORT_VERSION . "). Got version " . (defined $version ? $version : '(undef)' ) );
66             }
67 0           $self;
68             }
69              
70             sub _connect {
71 0     0     my $self = shift;
72              
73 0           return DBI->connect(@{ $self->connect_info });
  0            
74             }
75              
76             sub dbh {
77 0     0 1   my $self = shift;
78 0           my $dbh = $self->_dbh;
79              
80 0 0 0       if ( ! $dbh || ! $dbh->ping ) {
81 0 0         $self->auto_reconnect or die "not connect";
82 0           $dbh = $self->_connect();
83 0           $self->_dbh( $dbh );
84             }
85 0           return $dbh;
86             }
87              
88             sub next {
89 0     0 1   my $self = shift;
90 0           my $table = shift;
91 0           my ( $where ) = @_;
92              
93 0 0         if ( my $pre = $self->_res ) {
94 0           Carp::carp( 'abort not finished job. id='. $pre->{id} );
95 0           $self->abort;
96             }
97 0           my $dbh = $self->dbh;
98 0           my $sql = "SELECT * FROM $table";
99 0           my ($sql_where, @bind) = $self->sql_maker->where($where);
100 0 0         if ( $sql_where eq '' ) {
101 0           $sql .= " WHERE pg_try_advisory_lock(tableoid::int, id)";
102             }
103             else {
104 0           (my $cond = $sql_where) =~ s/^\s+WHERE\s//i;
105 0           $sql .= " WHERE CASE WHEN $cond THEN pg_try_advisory_lock(tableoid::int, id) ELSE false END";
106             }
107 0           $sql .= " LIMIT 1";
108              
109 0           my $sth = $dbh->prepare($sql);
110              
111 0           while ( $sth->execute(@bind) ) {
112 0           my $res = $sth->fetchrow_hashref;
113 0 0         if ($res) {
114 0           $res->{_table} = $table;
115 0           $self->_res($res);
116 0           return $res;
117             }
118 0           sleep $self->interval;
119             }
120 0           return;
121             }
122              
123             *fetch = \&fetch_hashref;
124              
125             sub fetch_hashref {
126 0     0 1   my $self = shift;
127 0           return $self->_res;
128             }
129              
130             sub abort {
131 0     0 0   my $self = shift;
132              
133 0 0         return unless $self->_res;
134              
135 0           my $dbh = $self->_dbh;
136 0           my $res = $self->_res;
137 0           my $sql
138             = "SELECT pg_advisory_unlock(tableoid::int, ?) FROM "
139             . $res->{_table};
140 0           my $sth = $dbh->prepare($sql);
141 0           $sth->execute( $res->{id} );
142 0           my $r = $sth->fetchrow_arrayref;
143 0           $sth->finish;
144 0           return $r->[0];
145             }
146              
147             sub ack {
148 0     0 1   my $self = shift;
149              
150 0 0         return unless $self->_res;
151              
152 0           my $dbh = $self->_dbh;
153 0           my $res = $self->_res;
154              
155 0           my ($sql, @bind) = $self->sql_maker->delete(
156             $res->{_table},
157             { id => $res->{id} },
158             );
159 0           $sql .= " RETURNING pg_advisory_unlock(tableoid::int, id)";
160 0           my $sth = $dbh->prepare($sql);
161 0           $sth->execute(@bind);
162 0           my $r = $sth->fetchrow_arrayref;
163 0           $sth->finish;
164              
165 0           $self->_res(undef);
166 0           return $r->[0];
167             }
168              
169             sub insert {
170 0     0 1   my $self = shift;
171 0           my $table = shift;
172              
173 0           my ($sql, @bind) = $self->sql_maker->insert($table, @_);
174 0           my $dbh = $self->dbh;
175 0           my $sth = $dbh->prepare($sql);
176 0           my $rv = $sth->execute(@bind);
177 0           $sth->finish;
178 0           return $rv;
179             }
180              
181             sub disconnect {
182 0     0 1   my $self = shift;
183 0 0         $self->_dbh->disconnect if $self->_dbh;
184 0           $self->_dbh(undef);
185 0           $self->_res(undef);
186 0           1;
187             }
188              
189             sub clear {
190 0     0 1   my $self = shift;
191 0           my $table = shift;
192 0           my ($sql, @bind) = $self->sql_maker->delete(
193             $table,
194             { "pg_try_advisory_lock(tableoid::int, id)" => \"" },
195             );
196 0           $sql .= " RETURNING pg_advisory_unlock(tableoid::int, id)";
197 0           my $sth = $self->dbh->prepare($sql);
198 0           my $rows = $sth->execute();
199 0           $sth->finish();
200 0           return $rows;
201             }
202              
203             sub DESTROY {
204 0     0     my $self = shift;
205 0 0         $self->abort if $self->_res;
206 0           $self->disconnect;
207             }
208              
209             1;
210             __END__