File Coverage

blib/lib/Pg/Queue.pm
Criterion Covered Total %
statement 15 43 34.8
branch 0 2 0.0
condition 0 3 0.0
subroutine 5 10 50.0
pod 5 5 100.0
total 25 63 39.6


line stmt bran cond sub pod time code
1             package Pg::Queue;
2              
3 2     2   49333 use strict;
  2         6  
  2         67  
4 2     2   12 use warnings;
  2         5  
  2         66  
5              
6 2     2   1109 use DBI;
  2         15893  
  2         128  
7 2     2   919 use Moo;
  2         20498  
  2         14  
8              
9 2     2   3979 use version; our $VERSION = qv('1.0');
  2         4221  
  2         15  
10              
11             has dbh => (
12             is => 'rw',
13             required => 1,
14             );
15              
16             has queuename => (
17             is => 'rw',
18             default => sub { "queuetest" },
19             );
20              
21              
22             sub create_queue_table {
23 0     0 1   my( $self ) = @_;
24 0           my $dbh = $self->dbh;
25 0           my $name = $self->queuename;
26              
27 0           $dbh->do("DROP TABLE IF EXISTS $name");
28              
29 0           $dbh->do(<
30             CREATE TABLE $name (
31             id SERIAL PRIMARY KEY,
32             available BOOLEAN DEFAULT TRUE,
33             processed TIMESTAMP,
34             item TEXT
35             )
36             SQL
37              
38 0           $dbh->do("CREATE INDEX ${name}_available_idx ON $name (id) WHERE available");
39             }
40              
41             sub add_work_item {
42 0     0 1   my( $self, $item ) = @_;
43              
44 0           $self->dbh->do("INSERT INTO ${\$self->queuename} (item) VALUES (?)",undef, $item);
  0            
45             }
46              
47             sub pull_work_item {
48 0     0 1   my( $self, $callback ) = @_;
49 0           my $dbh = $self->dbh;
50 0           my $name = $self->queuename;
51              
52 0           $dbh->do("BEGIN");
53              
54 0           my $row = $dbh->selectrow_arrayref(
55             "UPDATE $name SET available=false
56             WHERE id = (
57             SELECT id FROM $name
58             WHERE available
59             ORDER BY id
60             LIMIT 1
61             FOR UPDATE SKIP LOCKED
62             ) RETURNING id,item"
63             );
64              
65 0 0 0       if( $row and $callback->(@$row) ) {
66 0           $dbh->do("UPDATE $name SET processed=NOW() where id = ?", undef, $row->[0]);
67 0           $dbh->do("COMMIT");
68 0           return 1; #TRUE
69             }
70             else {
71 0           $dbh->do("ROLLBACK");
72 0           return 0; #FALSE
73             }
74             }
75              
76             sub count_total {
77 0     0 1   my( $self ) = @_;
78              
79 0           my $row = $self->dbh->selectrow_arrayref( "SELECT count(*) FROM ${\$self->queuename}" );
  0            
80              
81 0           return $row->[0];
82             }
83              
84             sub count_available {
85 0     0 1   my( $self ) = @_;
86              
87 0           my $row = $self->dbh->selectrow_arrayref( "SELECT count(*) FROM ${\$self->queuename} WHERE available" );
  0            
88              
89 0           return $row->[0];
90             }
91              
92              
93             1;
94             __END__