File Coverage

blib/lib/TheSchwartz/Simple.pm
Criterion Covered Total %
statement 18 104 17.3
branch 0 38 0.0
condition 0 2 0.0
subroutine 6 13 46.1
pod 0 5 0.0
total 24 162 14.8


line stmt bran cond sub pod time code
1             package TheSchwartz::Simple;
2              
3 1     1   7 use strict;
  1         2  
  1         45  
4 1     1   13 use 5.8.1;
  1         3  
  1         72  
5             our $VERSION = '0.05';
6              
7 1     1   6 use Carp;
  1         1  
  1         97  
8 1     1   5 use Scalar::Util qw( refaddr );
  1         2  
  1         110  
9 1     1   1240 use Storable;
  1         4178  
  1         70  
10 1     1   766 use TheSchwartz::Simple::Job;
  1         2  
  1         1323  
11              
12             sub new {
13 0     0 0   my $class = shift;
14 0           my ($dbhs) = @_;
15 0 0         $dbhs = [$dbhs] unless ref $dbhs eq 'ARRAY';
16 0           bless {
17             databases => $dbhs,
18             _funcmap => {},
19             prefix => "",
20             }, $class;
21             }
22              
23             sub insert {
24 0     0 0   my $self = shift;
25              
26 0           my $job;
27 0 0         if ( ref $_[0] eq 'TheSchwartz::Simple::Job' ) {
28 0           $job = $_[0];
29             }
30             else {
31 0           $job = TheSchwartz::Simple::Job->new_from_array(@_);
32             }
33 0 0         $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg;
34              
35 0           for my $dbh ( @{ $self->{databases} } ) {
  0            
36 0           my $jobid;
37 0           eval {
38 0           $job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) );
39 0           $job->insert_time(time);
40              
41 0           my $row = $job->as_hashref;
42 0           my @col = keys %$row;
43              
44 0           my $sql = sprintf 'INSERT INTO %sjob (%s) VALUES (%s)',
45             $self->{prefix},
46             join( ", ", @col ), join( ", ", ("?") x @col );
47              
48 0           my $sth = $dbh->prepare_cached($sql);
49 0           my $i = 1;
50 0           for my $col (@col) {
51 0           $sth->bind_param(
52             $i++,
53             $row->{$col},
54             _bind_param_attr( $dbh, $col ),
55             );
56             }
57 0           $sth->execute();
58              
59 0           $jobid = _insert_id( $dbh, $sth, "job", "jobid" );
60             };
61              
62 0 0         return $jobid if defined $jobid;
63             }
64              
65 0           return;
66             }
67              
68             sub funcname_to_id {
69 0     0 0   my ( $self, $dbh, $funcname ) = @_;
70              
71 0           my $dbid = refaddr $dbh;
72 0 0         unless ( exists $self->{_funcmap}{$dbid} ) {
73 0           my $sth
74             = $dbh->prepare_cached("SELECT funcid, funcname FROM $self->{prefix}funcmap");
75 0           $sth->execute;
76 0           while ( my $row = $sth->fetchrow_arrayref ) {
77 0           $self->{_funcmap}{$dbid}{ $row->[1] } = $row->[0];
78             }
79 0           $sth->finish;
80             }
81              
82 0 0         unless ( exists $self->{_funcmap}{$dbid}{$funcname} ) {
83             ## This might fail in a race condition since funcname is UNIQUE
84 0           my $sth = $dbh->prepare_cached(
85             "INSERT INTO $self->{prefix}funcmap (funcname) VALUES (?)");
86 0           eval { $sth->execute($funcname) };
  0            
87              
88 0           my $id = _insert_id( $dbh, $sth, "funcmap", "funcid" );
89              
90             ## If we got an exception, try to load the record again
91 0 0         if ($@) {
92 0           my $sth = $dbh->prepare_cached(
93             "SELECT funcid FROM $self->{prefix}funcmap WHERE funcname = ?");
94 0           $sth->execute($funcname);
95 0 0         $id = $sth->fetchrow_arrayref->[0]
96             or croak "Can't find or create funcname $funcname: $@";
97             }
98              
99 0           $self->{_funcmap}{$dbid}{$funcname} = $id;
100             }
101              
102 0           $self->{_funcmap}{$dbid}{$funcname};
103             }
104              
105             sub _insert_id {
106 0     0     my ( $dbh, $sth, $table, $col ) = @_;
107              
108 0           my $driver = $dbh->{Driver}{Name};
109 0 0         if ( $driver eq 'mysql' ) {
    0          
    0          
110 0           return $dbh->{mysql_insertid};
111             }
112             elsif ( $driver eq 'Pg' ) {
113 0           return $dbh->last_insert_id( undef, undef, undef, undef,
114             { sequence => join( "_", $table, $col, 'seq' ) } );
115             }
116             elsif ( $driver eq 'SQLite' ) {
117 0           return $dbh->func('last_insert_rowid');
118             }
119             else {
120 0           croak "Don't know how to get last insert id for $driver";
121             }
122             }
123              
124             sub list_jobs {
125 0     0 0   my ( $self, $arg ) = @_;
126              
127 0 0         die "No funcname" unless exists $arg->{funcname};
128              
129 0           my @options;
130 0 0         push @options, {
131             key => 'run_after',
132             op => '<=',
133             value => $arg->{run_after}
134             } if exists $arg->{run_after};
135 0 0         push @options, {
136             key => 'grabbed_until',
137             op => '<=',
138             value => $arg->{grabbed_until}
139             } if exists $arg->{grabbed_until};
140              
141 0 0         if ( $arg->{coalesce} ) {
142 0   0       $arg->{coalesce_op} ||= '=';
143 0           push @options, {
144             key => 'coalesce',
145             op => $arg->{coalesce_op},
146             value => $arg->{coalesce}
147             };
148             }
149              
150 0           my @jobs;
151 0           for my $dbh ( @{ $self->{databases} } ) {
  0            
152 0           eval {
153 0           my $funcid = $self->funcname_to_id( $dbh, $arg->{funcname} );
154              
155 0           my $sql = "SELECT * FROM $self->{prefix}job WHERE funcid = ?";
156 0           my @value = ($funcid);
157 0           for (@options) {
158 0           $sql .= " AND $_->{key} $_->{op} ?";
159 0           push @value, $_->{value};
160             }
161              
162 0           my $sth = $dbh->prepare_cached($sql);
163 0           $sth->execute(@value);
164 0           while ( my $ref = $sth->fetchrow_hashref ) {
165 0           push @jobs, TheSchwartz::Simple::Job->new($ref);
166             }
167             };
168             }
169              
170 0           return @jobs;
171             }
172              
173             sub _bind_param_attr {
174 0     0     my ( $dbh, $col ) = @_;
175              
176 0 0         return if $col ne 'arg';
177              
178 0           my $driver = $dbh->{Driver}{Name};
179 0 0         if ( $driver eq 'Pg' ) {
    0          
180 0           return { pg_type => DBD::Pg::PG_BYTEA() };
181             }
182             elsif ( $driver eq 'SQLite' ) {
183 0           return DBI::SQL_BLOB();
184             }
185 0           return;
186             }
187              
188             sub prefix {
189 0     0 0   my $self = shift;
190              
191 0 0         $self->{prefix} = shift if @_;
192 0           return $self->{prefix};
193             }
194              
195              
196             1;
197             __END__