File Coverage

blib/lib/cPanel/TaskQueue/Scheduler.pm
Criterion Covered Total %
statement 194 220 88.1
branch 80 112 71.4
condition 24 39 61.5
subroutine 33 33 100.0
pod 16 16 100.0
total 347 420 82.6


line stmt bran cond sub pod time code
1             package cPanel::TaskQueue::Scheduler;
2             {
3             $cPanel::TaskQueue::Scheduler::VERSION = '0.606';
4             }
5              
6             # cpanel - cPanel/TaskQueue/Scheduler.pm Copyright(c) 2014 cPanel, Inc.
7             # All rights Reserved.
8             # copyright@cpanel.net http://cpanel.net
9             #
10             # Redistribution and use in source and binary forms, with or without
11             # modification, are permitted provided that the following conditions are met:
12             # * Redistributions of source code must retain the above copyright
13             # notice, this list of conditions and the following disclaimer.
14             # * Redistributions in binary form must reproduce the above copyright
15             # notice, this list of conditions and the following disclaimer in the
16             # documentation and/or other materials provided with the distribution.
17             # * Neither the name of the owner nor the names of its contributors may
18             # be used to endorse or promote products derived from this software
19             # without specific prior written permission.
20             #
21             # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22             # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23             # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24             # DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY
25             # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26             # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27             # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28             # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29             # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30             # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31              
32             # This module handles queuing of tasks for execution. The queue is persistent
33             # handles consolidating of duplicate tasks.
34              
35 19     19   273080 use strict;
  19         41  
  19         662  
36              
37             #use warnings;
38 19     19   7144 use cPanel::TaskQueue ();
  19         48  
  19         366  
39 19     19   145 use cPanel::TaskQueue::Task();
  19         37  
  19         328  
40 19     19   93 use cPanel::StateFile ();
  19         34  
  19         65578  
41              
42             # -----------------------------------------------------------------------------
43             # Policy code: The following allows is a little weird because its intent is to
44             # change the policy by which some code is executed, without adding a gratuitous
45             # object and polymorphism into the mix.
46             #
47             # I had originally redefined the methods, but that seems a little too magical
48             # when indirecting through goto works as well (if a little slower).
49              
50             # These methods are intended to help document the importance of the message and
51             # to supply 'seam' that could be used to modify the logging behavior of the
52             # StateFile.
53             my $are_policies_set = 0;
54             my $the_serializer;
55             my $pkg = __PACKAGE__;
56              
57             #
58             # This method allows changing the policies for logging and locking.
59             sub import {
60 9     9   1625 my $class = shift;
61 9 50       54 die "Not an even number of arguments to the $pkg module\n" if @_ % 2;
62 9 100       75 die "Policies already set elsewhere\n" if $are_policies_set;
63 7 100       6725 return 1 unless @_; # Don't set the policies flag.
64              
65 4         16 while (@_) {
66 5         17 my ( $policy, $module ) = splice( @_, 0, 2 );
67 5         9 my @methods = ();
68 5 100       27 if ( '-logger' eq $policy ) {
    50          
69 3         27 cPanel::StateFile->import( '-logger' => $module );
70             }
71             elsif ( '-serializer' eq $policy ) {
72 2         7 _load_serializer_module($module);
73 2         12 $the_serializer = $module;
74             }
75             else {
76 0         0 die "Unrecognized policy '$policy'\n";
77             }
78             }
79 4         10 $are_policies_set = 1;
80 4         5306 return 1;
81             }
82              
83             sub _load_serializer_module {
84 2     2   4 my ($module) = @_;
85 2 50       7 die "Supplied serializer must be a module name.\n" if ref $module;
86 2 50       20 die "'$module' does not look like a serializer" unless $module =~ m{^\w+(?:::\w+)*$};
87 2         126 eval "use $module;"; ## no critic (ProhibitStringyEval)
88 2 50       15 die $@ if $@;
89 2 50       9 die 'Supplied serializer object does not support the correct interface.'
90             unless _valid_serializer($module);
91 2         6 return;
92             }
93              
94             sub _valid_serializer {
95 2     2   6 my ($serializer) = @_;
96 2         6 foreach my $method (qw/load save filename/) {
97 6 50       10 return unless eval { $serializer->can($method) };
  6         65  
98             }
99 2         14 return 1;
100             }
101              
102             sub _get_serializer {
103 35 100   35   105 unless ( defined $the_serializer ) {
104 6     8   641 eval 'use cPanel::TQSerializer::Storable;'; ## no crititc (ProhibitStringyEval)
  8         6230  
  8         28  
  8         189  
105 6 50       175 cPanel::StateFile->_throw(@_) if $@;
106 6         22 $the_serializer = 'cPanel::TQSerializer::Storable';
107             }
108 35         219 return $the_serializer;
109             }
110              
111             # Replacement for List::Util::first, so I don't need to bring in the whole module.
112             sub _first (&@) { ## no critic(ProhibitSubroutinePrototypes)
113 8     8   12 my $pred = shift;
114 8         9 local $_;
115 8         18 foreach (@_) {
116 13 100       22 return $_ if $pred->();
117             }
118 1         6 return;
119             }
120              
121             # Namespace value used when creating unique task ids.
122             my $tasksched_uuid = 'TaskQueue-Scheduler';
123              
124             {
125             my $FILETYPE = 'TaskScheduler'; # Identifier at the beginning of the state file
126             my $CACHE_VERSION = 2; # Cache file version number.
127              
128             # Disk Cache & state file.
129             #
130 8     8 1 4231 sub get_name { $_[0]->{scheduler_name}; }
131              
132             # --------------------------------------
133             # Class methods
134              
135             # Initialize parameters.
136             sub new {
137 29     29 1 23216 my ( $class, $args_ref ) = @_;
138 29         225 my $self = bless {
139             next_id => 1,
140             time_queue => [],
141             disk_state => undef,
142             }, $class;
143              
144 29 50       140 if ( defined $args_ref->{serial} ) {
145 0         0 _load_serializer_module( $args_ref->{serial} );
146 0         0 $self->{serializer} = $args_ref->{serial};
147             }
148 29   33     240 $self->{serializer} ||= _get_serializer();
149 29 100       104 if ( exists $args_ref->{token} ) {
150 16         60 my ( $version, $name, $file ) = split( ':\|:', $args_ref->{token} );
151              
152             # have all parts
153 16 100 100     121 cPanel::StateFile->_throw('Invalid token.')
      100        
154             unless defined $version
155             and defined $name
156             and defined $file;
157              
158             # all parts make sense.
159 6         13 my $name_match = _get_serializer()->filename("${name}_sched");
160 6 100 100     134 cPanel::StateFile->_throw('Invalid token.')
161             unless 'tqsched1' eq $version and $file =~ m{/\Q$name_match\E$};
162              
163 2         6 $self->{scheduler_name} = $name;
164 2         8 $self->{disk_state_file} = $file;
165             }
166             else {
167 13 50 0     55 $args_ref->{state_dir} ||= $args_ref->{cache_dir} if exists $args_ref->{cache_dir};
168 13 100       68 cPanel::StateFile->_throw('No caching directory supplied.') unless exists $args_ref->{state_dir};
169 12 100       66 cPanel::StateFile->_throw('No scheduler name supplied.') unless exists $args_ref->{name};
170              
171 11         96 $self->{disk_state_file} = $self->_serializer()->filename("$args_ref->{state_dir}/$args_ref->{name}_sched");
172 11         43 $self->{scheduler_name} = $args_ref->{name};
173             }
174              
175             # Make a disk file to track the object.
176 13 50       146 my $state_args = {
    50          
    50          
177             state_file => $self->{disk_state_file}, data_obj => $self,
178              
179             # Deprecated version
180             exists $args_ref->{cache_timeout} ? ( timeout => $args_ref->{cache_timeout} ) : (),
181             exists $args_ref->{state_timeout} ? ( timeout => $args_ref->{state_timeout} ) : (),
182             exists $args_ref->{logger} ? ( logger => $args_ref->{logger} ) : (),
183             };
184             eval {
185 13         117 $self->{disk_state} = cPanel::StateFile->new($state_args);
186 9         38 1;
187 13 100       29 } or do {
188 4         9 my $ex = $@;
189              
190             # If not a loading error, rethrow.
191 4 50       47 cPanel::StateFile->_throw($ex) unless $ex =~ /Not a recognized|Invalid version/;
192 4         29 cPanel::StateFile->_warn($ex);
193 4         53 cPanel::StateFile->_warn("Moving bad state file and retry.\n");
194 4         58 cPanel::StateFile->_notify(
195             'Unable to load TaskQueue::Scheduler metadata',
196             "Loading of [$self->{disk_state_file}] failed: $ex\n" . "Moving bad file to [$self->{disk_state_file}.broken] and retrying.\n"
197             );
198 4         105 unlink "$self->{disk_state_file}.broken";
199 4         197 rename $self->{disk_state_file}, "$self->{disk_state_file}.broken";
200              
201 4         16 $self->{disk_state} = cPanel::StateFile->new($state_args);
202             };
203 13         66 return $self;
204             }
205              
206             sub throw {
207 10     10 1 19 my $self = shift;
208 10 100       77 return $self->{disk_state} ? $self->{disk_state}->throw(@_) : cPanel::StateFile->_throw(@_);
209             }
210              
211             # Not using warn, so don't define it.
212             sub info {
213 1     1 1 2 my $self = shift;
214 1 50       10 return $self->{disk_state} ? $self->{disk_state}->info(@_) : undef;
215             }
216              
217             # -------------------------------------------------------
218             # Pseudo-private methods. Should not be called except under unusual circumstances.
219             sub _serializer {
220 69     69   123 my ($self) = @_;
221 69         609 return $self->{serializer};
222             }
223              
224             sub _state_file {
225 1     1   612 my ($self) = @_;
226 1         11 return $self->{disk_state_file};
227             }
228              
229             # -------------------------------------------------------
230             # Public methods
231             sub load_from_cache {
232 9     9 1 19 my ( $self, $fh ) = @_;
233              
234 9         34 local $/;
235 9         33 my ( $magic, $version, $meta ) = $self->_serializer()->load($fh);
236              
237 9 100 100     325 $self->throw("Not a recognized TaskQueue Scheduler state file.\n") unless defined $magic and $magic eq $FILETYPE;
238 7 100 66     65 $self->throw("Invalid version of TaskQueue Scheduler state file.\n") unless defined $version and $version eq $CACHE_VERSION;
239              
240             # Next id should continue increasing.
241             # (We might want to deal with wrap-around at some point.)
242 5 100       23 $self->{next_id} = $meta->{nextid} if $meta->{nextid} > $self->{next_id};
243              
244             # Clean queues that have been read from disk.
245 5         7 $self->{time_queue} = [ grep { _is_item_sane($_) } @{ $meta->{waiting_queue} } ];
  1         5  
  5         17  
246              
247 5         29 return 1;
248             }
249              
250             sub save_to_cache {
251 49     49 1 77 my ( $self, $fh ) = @_;
252              
253 49         218 my $meta = {
254             nextid => $self->{next_id},
255             waiting_queue => $self->{time_queue},
256             };
257 49         147 return $self->_serializer()->save( $fh, $FILETYPE, $CACHE_VERSION, $meta );
258             }
259              
260             sub schedule_task {
261 27     27 1 11507 my ( $self, $command, $args ) = @_;
262              
263 27 100       71 $self->throw('Cannot queue an empty command.') unless defined $command;
264 26 100 100     148 $self->throw('Args is not a hash ref.') unless defined $args and 'HASH' eq ref $args;
265              
266 24         36 my $time = time;
267 24 100       99 $time += $args->{delay_seconds} if exists $args->{delay_seconds};
268 24 100       67 $time = $args->{at_time} if exists $args->{at_time};
269              
270 24 100       28 if ( eval { $command->isa('cPanel::TaskQueue::Task') } ) {
  24         211  
271 1 50       5 if ( 0 == $command->retries_remaining() ) {
272 1         4 $self->info('Task with 0 retries not scheduled.');
273 1         16 return;
274             }
275 0         0 return $self->_schedule_the_task( $time, $command );
276             }
277              
278             # must have non-space characters to be a command.
279 23 100       95 $self->throw('Cannot queue an empty command.') unless $command =~ /\S/;
280              
281 22         36 my @retry_attrs = ();
282 22 100       59 if ( exists $args->{attempts} ) {
283 2 100       13 return unless $args->{attempts} > 0;
284 1         6 @retry_attrs = (
285             retries => $args->{attempts},
286             userdata => { sched => $self->get_token() }
287             );
288             }
289 21         204 my $task = cPanel::TaskQueue::Task->new(
290             {
291             cmd => $command, nsid => $tasksched_uuid, id => $self->{next_id}++,
292             @retry_attrs
293             }
294             );
295 21         92 return $self->_schedule_the_task( $time, $task );
296             }
297              
298             sub unschedule_task {
299 18     18 1 36 my ( $self, $uuid ) = @_;
300              
301 18 50       45 unless ( _is_valid_uuid($uuid) ) {
302 0         0 $self->throw('No Task uuid argument passed to unschedule_task.');
303             }
304              
305             # Lock the queue before we begin accessing it.
306 18         86 my $guard = $self->{disk_state}->synch();
307 18         27 my $old_count = @{ $self->{time_queue} };
  18         36  
308              
309 18         29 $self->{time_queue} = [ grep { $_->{task}->uuid() ne $uuid } @{ $self->{time_queue} } ];
  48         140  
  18         36  
310              
311             # All changes complete, save to disk.
312 18         171 $guard->update_file();
313 18         26 return $old_count > @{ $self->{time_queue} };
  18         89  
314             }
315              
316             sub is_task_scheduled {
317 5     5 1 1535 my ( $self, $uuid ) = @_;
318              
319 5 50       13 unless ( _is_valid_uuid($uuid) ) {
320 0         0 $self->throw('No Task uuid argument passed to is_task_scheduled.');
321             }
322              
323             # Update from disk, but don't worry about lock. Information only.
324 5         22 $self->{disk_state}->synch();
325              
326 5     7   20 return _first { $_->{task}->uuid() eq $uuid } @{ $self->{time_queue} };
  7         24  
  5         20  
327             }
328              
329             sub when_is_task_scheduled {
330 3     3 1 7 my ( $self, $uuid ) = @_;
331              
332 3 50       7 unless ( _is_valid_uuid($uuid) ) {
333 0         0 $self->throw('No Task uuid argument passed to when_is_task_scheduled.');
334             }
335              
336             # Update from disk, but don't worry about lock. Information only.
337 3         12 $self->{disk_state}->synch();
338              
339 3     6   16 my $task = _first { $_->{task}->uuid() eq $uuid } @{ $self->{time_queue} };
  6         20  
  3         9  
340 3 50       13 return unless defined $task;
341 3         15 return $task->{time};
342             }
343              
344             sub how_many_scheduled {
345 3     3 1 1519 my ($self) = @_;
346              
347             # Update from disk, but don't worry about lock. Information only.
348 3         14 $self->{disk_state}->synch();
349 3         5 return scalar @{ $self->{time_queue} };
  3         23  
350             }
351              
352             sub peek_next_task {
353 28     28 1 468 my ($self) = @_;
354              
355             # Update from disk, but don't worry about lock. Information only.
356 28         105 $self->{disk_state}->synch();
357 28 100       39 return unless @{ $self->{time_queue} };
  28         106  
358              
359 22         134 return $self->{time_queue}->[0]->{task}->clone();
360             }
361              
362             sub seconds_until_next_task {
363 4     4 1 12 my ($self) = @_;
364              
365             # Update from disk, but don't worry about lock. Information only.
366 4         17 $self->{disk_state}->synch();
367 4 100       7 return unless @{ $self->{time_queue} };
  4         19  
368              
369 3         16 return $self->{time_queue}->[0]->{time} - time;
370             }
371              
372             sub process_ready_tasks {
373 2     2 1 896 my ( $self, $queue ) = @_;
374              
375 2 50 66     11 unless ( defined $queue and eval { $queue->can('queue_task') } ) {
  1         14  
376 2         8 $self->throw('No valid queue supplied.');
377             }
378              
379             # Don't generate lock yet, we may not need one.
380 0         0 $self->{disk_state}->synch();
381 0         0 my $count = 0;
382 0         0 my $guard;
383 0         0 eval {
384 0         0 while ( @{ $self->{time_queue} } ) {
  0         0  
385 0         0 my $item = $self->{time_queue}->[0];
386              
387 0 0       0 last if time < $item->{time};
388 0 0       0 if ( !$guard ) {
389              
390             # Now we know we'll be changing the schedule, so we need to
391             # lock it.
392 0   0     0 $guard ||= $self->{disk_state}->synch();
393 0         0 next;
394             }
395              
396             # Should be safe from deadlock unless queue calls back to me.
397 0         0 $queue->queue_task( $item->{task} );
398 0         0 ++$count;
399              
400             # Only remove from the schedule when the queue has processed it.
401 0         0 shift @{ $self->{time_queue} };
  0         0  
402             }
403             };
404 0         0 my $ex = $@;
405 0 0 0     0 $guard->update_file() if $count && $guard;
406 0 0       0 die $ex if $ex;
407              
408 0         0 return $count;
409             }
410              
411             sub get_token {
412 3     3 1 19 my ( $self, $command, $time ) = @_;
413              
414 3         19 return join( ':|:', 'tqsched1', $self->{scheduler_name}, $self->{disk_state_file} );
415             }
416              
417             sub snapshot_task_schedule {
418 1     1 1 3 my ($self) = @_;
419              
420 1         5 $self->{disk_state}->synch();
421              
422             return [
423 3         14 map {
424 1         4 { time => $_->{time}, task => $_->{task}->clone() }
425 1         2 } @{ $self->{time_queue} }
426             ];
427             }
428              
429             # ---------------------------------------------------------------
430             # Private Methods.
431             sub _schedule_the_task {
432 21     21   33 my ( $self, $time, $task ) = @_;
433              
434 21         88 my $guard = $self->{disk_state}->synch();
435 21         84 my $item = { time => $time, task => $task };
436              
437             # if the list is empty, or time after all in list.
438 21 100 100     27 if ( !@{ $self->{time_queue} } or $time >= $self->{time_queue}->[-1]->{time} ) {
  21 100       122  
439 17         20 push @{ $self->{time_queue} }, $item;
  17         31  
440             }
441             elsif ( $time < $self->{time_queue}->[0]->{time} ) {
442              
443             # schedule before anything in the list
444 3         4 unshift @{ $self->{time_queue} }, $item;
  3         7  
445             }
446             else {
447              
448             # find the correct spot in the list.
449 1         3 foreach my $i ( 1 .. $#{ $self->{time_queue} } ) {
  1         5  
450 3 100       14 next unless $self->{time_queue}->[$i]->{time} > $time;
451 1         2 splice( @{ $self->{time_queue} }, $i, 0, $item );
  1         4  
452 1         3 last;
453             }
454             }
455              
456 21         71 $guard->update_file();
457 21         80 return $task->uuid();
458             }
459              
460             sub _is_item_sane {
461 1     1   3 my ($item) = @_;
462 1 50       4 return unless 'HASH' eq ref $item;
463 1 50 33     8 return unless exists $item->{task} and exists $item->{time};
464 1         9 $item->{task} = cPanel::TaskQueue::Task->reconstitute( $item->{task} );
465 1 50       2 return unless eval { $item->{task}->isa('cPanel::TaskQueue::Task') };
  1         13  
466 1         10 return $item->{time} =~ /^\d+$/;
467             }
468              
469             sub _is_valid_uuid {
470 26     26   93 return cPanel::TaskQueue::Task::is_valid_taskid(shift);
471             }
472             }
473              
474             1;
475              
476             __END__