File Coverage

blib/lib/Queue/Q4M/Worker.pm
Criterion Covered Total %
statement 18 110 16.3
branch 1 28 3.5
condition 1 3 33.3
subroutine 6 20 30.0
pod 0 7 0.0
total 26 168 15.4


line stmt bran cond sub pod time code
1             package Queue::Q4M::Worker;
2 1     1   956 use strict;
  1         2  
  1         43  
3 1     1   29716 use DBI;
  1         36535  
  1         99  
4 1     1   1163 use POSIX qw(:signal_h);
  1         15887  
  1         11  
5 1     1   4284 use Time::HiRes ();
  1         2322  
  1         49  
6             use Class::Accessor::Lite
7 1         9 rw => [ qw(
8             before_loop_cb
9             dbh
10             delay
11             loop_iteration_cb
12             max_workers
13             min_requests_per_child
14             max_requests_per_child
15             signal_received
16             sql
17             _work_once
18             ) ]
19 1     1   987 ;
  1         1223  
20              
21             our $VERSION = '0.06';
22              
23             my $GUARD_CB;
24             BEGIN {
25 1 50 33 1   331 if ( eval { require Scope::Guard } && !$@ ) {
  1         981  
26 1         1808 $GUARD_CB = \&Scope::Guard::guard;
27             } else {
28             *Queue::Q4M::Worker::Guard::DESTROY = sub {
29 0 0         if (! $_[0][0]) {
30 0           $_[0]->();
31             }
32 0           };
33 0           $GUARD_CB = sub { bless [ 1, $_[0] ], 'Queue::Q4M::Worker::Guard' };
  0            
34             }
35             }
36              
37             sub new {
38 0     0 0   my ($class, %args) = @_;
39              
40 0           bless {
41             max_workers => 0,
42             max_requests_per_child => 10_000,
43             min_requests_per_child => 0,
44             _work_once => delete $args{work_once},
45             %args
46             }, $class;
47             }
48              
49             sub _get_sql {
50 0     0     my $self = shift;
51 0           my $sql = $self->sql;
52              
53 0           my ($stmt, @binds);
54 0 0         if (ref $sql eq 'CODE') {
55 0           ($stmt, @binds) = $sql->($self);
56             } else {
57 0           $stmt = $sql;
58             }
59 0           return ($stmt, @binds);
60             }
61              
62              
63             sub _get_before_loop_guard {
64 0     0     my $self = shift;
65 0           my $cb = $self->before_loop_cb();
66 0 0         if ($cb) {
67 0           return $cb->($self);
68             }
69             }
70              
71             sub _get_loop_iteration_guard {
72 0     0     my $self = shift;
73 0           my $cb = $self->loop_iteration_cb();
74 0 0         if ($cb) {
75 0           return $cb->($self);
76             }
77             }
78              
79             sub _get_dbh {
80 0     0     my $self = shift;
81 0           my $dbh = $self->dbh;
82              
83 0           my $handle;
84 0 0         if ( ref $dbh eq 'CODE' ) {
85 0           $handle = $dbh->($self);
86             } else {
87 0           $handle = $dbh;
88             }
89 0           return $handle;
90             }
91              
92             sub work_once {
93 0     0 0   my $self = shift;
94 0 0         if ( my $cb = $self->_work_once) {
95 0           return $cb->( $self, @_ );
96             }
97             }
98              
99             # XXX can we process more jobs?
100 0     0 0   sub should_process_more { $_[0]->{stop_at} > $_[0]->{processed} }
101              
102             sub should_loop {
103 0 0   0 0   $_[0]->should_process_more &&
104             ! $_[0]->signal_received
105             }
106              
107             sub work {
108 0     0 0   my $self = shift;
109              
110 0 0         if ( $self->max_workers > 1 ) {
111 0           $self->run_multi();
112             } else {
113 0           $self->run_single();
114             }
115             }
116              
117             # Run multiple children using Parallel::Prefork (if you want more
118             # control over how this is done, please subclass).
119             sub run_multi {
120 0     0 0   my $self = shift;
121 0           require Parallel::Prefork;
122 0           my $pp = Parallel::Prefork->new({
123             max_workers => $self->max_workers,
124             trap_signals => {
125             TERM => 'TERM',
126             HUP => 'TERM',
127             }
128             });
129              
130 0           while ( $pp->signal_received ne 'TERM' ) {
131 0     0     $pp->start(sub { $self->run_single });
  0            
132             }
133              
134             $pp->wait_all_children()
135 0           }
136              
137             sub run_single {
138 0     0 0   my $self = shift;
139              
140 0           my $min_requests = $self->min_requests_per_child;
141 0           my $max_requests = $self->max_requests_per_child;
142              
143             # WTF? min_requests can't be 0
144 0 0         if ($min_requests < 0) {
145 0           $min_requests = 0;
146             }
147              
148             # WTF? max_requests must be > min_requests
149             # arbitrarily choose min + 5_000
150 0 0         if ($max_requests <= $min_requests) {
151 0           $max_requests = $min_requests + 5000;
152             }
153 0           my $stop_at = int(rand($max_requests));
154 0           $self->{stop_at} = $stop_at;
155              
156 0           my $dbh;
157             my $sth;
158 0           my $sigset = POSIX::SigSet->new( SIGINT, SIGQUIT, SIGTERM );
159             my $cancel_q4m = POSIX::SigAction->new(sub {
160 0     0     my $signame = shift;
161 0           eval { $sth->cancel };
  0            
162 0           eval { $dbh->disconnect };
  0            
163 0           $self->signal_received( $signame );
164 0           }, $sigset, &POSIX::SA_NOCLDSTOP);
165             my $install_sig = sub {
166             # XXX use SigSet to properly interrupt the process
167 0     0     POSIX::sigaction( SIGINT, $cancel_q4m );
168 0           POSIX::sigaction( SIGQUIT, $cancel_q4m );
169 0           POSIX::sigaction( SIGTERM, $cancel_q4m );
170 0           };
171              
172 0           $install_sig->();
173              
174             # Run arbitrary code before loop. Optionally return a guard object
175 0           my $before_loop = $self->_get_before_loop_guard();
176 0           my $default_sig = POSIX::SigAction->new('DEFAULT');
177 0           while ($self->should_loop) {
178             # This is entirely optional. If you want do something that only
179             # has an effect during this particular iteration of the loop,
180             # you can create a guard here.
181 0           my $guard = $self->_get_loop_iteration_guard();
182              
183             # This may seem like a waste, but sometimes you have multiple queues
184             # to fetch from, and you want multiplex between each database, so
185             # we fetch the database per-iteration
186 0           $dbh = $self->_get_dbh();
187              
188 0           my ($stmt, @binds) = $self->_get_sql();
189 0           $sth = $dbh->prepare($stmt);
190              
191 0           my $rv = $sth->execute( @binds );
192 0 0         if ( $rv == 0 ) { # nothing
193 0           $sth->finish;
194 0           next;
195             }
196              
197 0 0         if ( my $h = $sth->fetchrow_hashref ) {
198 0           $self->{processed}++;
199 0           $dbh->do("SELECT queue_end()");
200              
201             # while the consumer is working, we need to reset the
202             # signal handlers that we previously set
203 0           my $gobj = $GUARD_CB->($install_sig);
204 0           POSIX::sigaction( SIGINT, $default_sig );
205 0           POSIX::sigaction( SIGQUIT, $default_sig );
206 0           POSIX::sigaction( SIGTERM, $default_sig );
207              
208 0           $self->work_once( $h );
209             }
210 0 0         if (my $delay = $self->delay) {
211 0           Time::HiRes::sleep(rand($delay));
212             }
213             }
214 0           POSIX::sigaction( SIGINT, $default_sig );
215 0           POSIX::sigaction( SIGQUIT, $default_sig );
216 0           POSIX::sigaction( SIGTERM, $default_sig );
217             }
218              
219             1;
220              
221             __END__