File Coverage

blib/lib/Queue/Q4M.pm
Criterion Covered Total %
statement 27 75 36.0
branch 0 20 0.0
condition 0 14 0.0
subroutine 9 14 64.2
pod n/a
total 36 123 29.2


line stmt bran cond sub pod time code
1             # $Id: /mirror/coderepos/lang/perl/Queue-Q4M/trunk/lib/Queue/Q4M.pm 103794 2009-04-13T11:38:30.159603Z daisuke $
2             #
3             # Copyright (c) 2008 Daisuke Maki
4             # All rights reserved.
5              
6             package Queue::Q4M;
7 3     3   19083 use Any::Moose;
  3         178162  
  3         22  
8 3     3   1816 use Any::Moose '::Util::TypeConstraints';
  3         7  
  3         14  
9 3     3   1131 use Carp();
  3         8  
  3         64  
10 3     3   17398 use DBI;
  3         73746  
  3         311  
11 3     3   4777 use SQL::Abstract;
  3         38071  
  3         162  
12 3     3   2179 use Queue::Q4M::Status;
  3         10  
  3         550  
13              
14             class_type 'Queue::Q4M::Result';
15              
16             has 'auto_reconnect' => (
17             is => 'rw',
18             isa => 'Bool',
19             required => 1,
20             default => 1,
21             );
22              
23             has 'owner_mode' => (
24             is => 'rw',
25             isa => 'Bool',
26             default => 0
27             );
28              
29             has '_connect_pid' => (
30             is => 'rw',
31             isa => 'Int'
32             );
33              
34             has 'connect_info' => (
35             is => 'rw',
36             isa => 'ArrayRef',
37             required => 1,
38             );
39              
40             has 'sql_maker' => (
41             is => 'rw',
42             isa => 'SQL::Abstract',
43             required => 1,
44             default => sub { SQL::Abstract->new }
45             );
46              
47             has '_dbh' => (
48             is => 'rw',
49             );
50              
51             has '__table' => (
52             is => 'rw',
53             );
54              
55             has '__res' => (
56             is => 'rw',
57             # isa => 'Maybe[Queue::Q4M::Result]'
58             );
59              
60             __PACKAGE__->meta->make_immutable;
61              
62 3     3   56 no Any::Moose;
  3         5  
  3         31  
63 3     3   968 no Any::Moose '::Util::TypeConstraints';
  3         5  
  3         14  
64              
65             our $AUTHORITY = 'cpan:DMAKI';
66             our $VERSION = '0.00019';
67              
68 3     3   2283 use constant Q4M_MINIMUM_VERSION => '0.8';
  3         7  
  3         7162  
69              
70             sub connect
71             {
72 0     0     my $self = shift;
73 0 0         if (! ref $self) {
74 0           $self = $self->new(@_);
75             }
76              
77 0 0         if (my $old = $self->_dbh()) {
78 0           $old->disconnect();
79             }
80              
81 0           my $dbh = $self->_connect();
82 0           $self->_dbh( $dbh );
83              
84             # Make sure we have the minimum supported API version
85             # (or, a Q4M enabled mysql, for that matter)
86 0           my $version;
87 0           eval {
88 0           my $sth = $dbh->prepare(<<' EOSQL');
89             SELECT PLUGIN_VERSION from
90             information_schema.plugins
91             WHERE plugin_name = ?
92             EOSQL
93 0           $sth->execute('QUEUE');
94 0           $sth->bind_columns(\$version);
95 0           $sth->fetchrow_arrayref;
96 0           $sth->finish;
97             };
98 0 0         warn if $@;
99              
100 0 0 0       if (! $version || $version < Q4M_MINIMUM_VERSION) {
101 0 0         Carp::confess( "Connected database does not meet the minimum required q4m version (" . Q4M_MINIMUM_VERSION . "). Got version " . (defined $version ? $version : '(undef)' ) );
102             }
103              
104 0           $self;
105             }
106              
107             sub _connect
108             {
109 0     0     my $self = shift;
110              
111 0           return DBI->connect(@{ $self->connect_info });
  0            
112             }
113              
114             sub dbh
115             {
116 0     0     my $self = shift;
117 0           my $dbh = $self->_dbh;
118              
119 0           my $pid = $self->_connect_pid;
120 0 0 0       if ( ($pid || '') ne $$ || ! $dbh || ! $dbh->ping) {
      0        
      0        
121 0 0         $self->auto_reconnect or die "not connect";
122 0           $dbh = $self->_connect();
123 0           $self->_dbh( $dbh );
124 0           $self->_connect_pid($$);
125             }
126 0           return $dbh;
127             }
128              
129             sub next
130             {
131 0     0     my $self = shift;
132 0           my @args = @_;
133              
134             # First, undef any cached table name that we might have had
135 0           $self->__table(undef);
136              
137 0           my @tables =
138 0           grep { !/^\d+$/ }
139             map {
140 0           (my $v = $_) =~ s/:.*$//;
141 0           $v
142             }
143             @args
144             ;
145              
146             # Cache this statement handler so we don't unnecessarily create
147             # string or handles
148 0           my $dbh = $self->dbh;
149 0           my $sql = sprintf(
150             "SELECT queue_wait(%s)",
151             join(',', (('?') x scalar(@args)))
152             );
153 0           my ($index) = $dbh->selectrow_array($sql, undef, @args);
154              
155 0 0 0       my $table = defined $index && $index > 0 ? $tables[$index - 1] : undef;
156             my $res = Queue::Q4M::Result->new(
157             rv => defined $table,
158             table => $table,
159 0     0     on_release => sub { $self->__table(undef) }
160 0           );
161              
162 0 0         if (defined $table) {
163 0           $self->__table($table);
164             }
165 0 0         $self->__res($res) if $res;
166 0           $self->owner_mode(1);
167 0           return $res;
168             }
169              
170             *fetch = \&fetch_array;
171              
172             BEGIN
173             {
174             foreach my $type qw(array arrayref hashref) {
175             eval sprintf( <<'EOSUB', $type, $type );
176             sub fetch_%s {
177             my $self = shift;
178             my $table = shift;
179             $table ||= $self->__table;
180             if (Scalar::Util::blessed $table &&
181             $table->isa('Queue::Q4M::Result'))
182             {
183             $table = $table->[1];
184             }
185              
186             $table or die "no table";
187              
188             my ($sql, @bind) = $self->sql_maker->select($table, @_);
189             my $dbh = $self->dbh;
190             $self->owner_mode(0);
191             return $dbh->selectrow_%s($sql, undef, @bind);
192             }
193             EOSUB
194             die if $@;
195             }
196             }
197              
198             sub insert
199             {
200             my $self = shift;
201             my $table = shift;
202              
203             my ($sql, @bind) = $self->sql_maker->insert($table, @_);
204             my $dbh = $self->dbh;
205             my $sth = $dbh->prepare($sql);
206             my $rv = $sth->execute(@bind);
207             $sth->finish;
208             return $rv;
209             }
210              
211             sub disconnect
212             {
213             my $self = shift;
214             my $dbh = $self->dbh;
215             if ($dbh) {
216             $dbh->do("select queue_end()");
217             $dbh->disconnect;
218             $self->_dbh(undef);
219             }
220             }
221              
222             sub clear
223             {
224             my ($self, $table) = @_;
225             return $self->dbh->do("DELETE FROM $table");
226             }
227              
228             sub status {
229             return Queue::Q4M::Status->fetch( shift->dbh );
230             }
231              
232             sub DEMOLISH
233             {
234             my $self = shift;
235             local $@;
236             eval {
237             $self->dbh->do("SELECT queue_abort()") if $self->owner_mode;
238             $self->disconnect;
239             };
240             }
241              
242             package
243             Queue::Q4M::Result;
244             use overload
245             bool => \&as_bool,
246             '""' => \&as_string,
247             fallback => 1
248             ;
249             use Scope::Guard;
250              
251             sub new
252             {
253             my $class = shift;
254             my %args = @_;
255             return bless [ $args{rv}, $args{table}, Scope::Guard->new( $args{on_release} ) ], $class;
256             }
257              
258             sub as_bool { $_[0]->[0] }
259             sub as_string { $_[0]->[1] }
260             sub DESTROY { $_[0]->[2]->dismiss(1) if $_[0]->[2] }
261              
262             1;
263              
264             __END__