File Coverage

blib/lib/cPanel/TaskQueue/Ctrl.pm
Criterion Covered Total %
statement 44 266 16.5
branch 12 138 8.7
condition 7 19 36.8
subroutine 9 28 32.1
pod 16 16 100.0
total 88 467 18.8


line stmt bran cond sub pod time code
1             package cPanel::TaskQueue::Ctrl;
2             {
3             $cPanel::TaskQueue::Ctrl::VERSION = '0.606';
4             }
5              
6             # cpanel - cPanel/TaskQueue/Ctrl.pm Copyright(c) 2014 cPanel, Inc.
7             # All rights Reserved.
8             # copyright@cpanel.net http://cpanel.net
9             #
10             # Redistribution and use in source and binary forms, with or without
11             # modification, are permitted provided that the following conditions are met:
12             # * Redistributions of source code must retain the above copyright
13             # notice, this list of conditions and the following disclaimer.
14             # * Redistributions in binary form must reproduce the above copyright
15             # notice, this list of conditions and the following disclaimer in the
16             # documentation and/or other materials provided with the distribution.
17             # * Neither the name of the owner nor the names of its contributors may
18             # be used to endorse or promote products derived from this software
19             # without specific prior written permission.
20             #
21             # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22             # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23             # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24             # DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY
25             # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26             # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27             # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28             # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29             # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30             # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31              
32 1     1   26317 use warnings;
  1         2  
  1         37  
33 1     1   6 use strict;
  1         2  
  1         36  
34              
35 1     1   701 use cPanel::TaskQueue ();
  1         3  
  1         20  
36 1     1   673 use cPanel::TaskQueue::Scheduler ();
  1         3  
  1         21  
37 1     1   687 use cPanel::TaskQueue::PluginManager ();
  1         3  
  1         19  
38 1     1   2520 use Text::Wrap ();
  1         7536  
  1         5306  
39              
40             my %format = (
41             storable => 'cPanel::TQSerializer::Storable',
42             yaml => 'cPanel::TQSerializer::YAML',
43             );
44              
45             my @required = qw(qdir qname);
46             my %validate = (
47             'qdir' => sub { return -d $_[0]; },
48             'qname' => sub { return defined $_[0] && length $_[0]; },
49             'sdir' => sub { return -d $_[0]; },
50             'sname' => sub { return defined $_[0] && length $_[0]; },
51             'logger' => sub { return 1; },
52             'out' => sub { return 1; },
53             'serial' => sub { return exists $format{ lc $_[0] }; },
54             );
55              
56             my %commands = (
57             queue => {
58             code => \&queue_tasks,
59             synopsis => 'queue "cmd string" ...',
60             help => ' Adds the specified commands to the TaskQueue. Prints the task id on
61             success or an error on failure. Multiple command strings may be supplied,
62             and each will be queued in turn.',
63             },
64             pause => {
65             code => sub { $_[2]->pause_processing(); return; },
66             synopsis => 'pause',
67             help => ' Pause the processing of waiting tasks from the TaskQueue.',
68             },
69             resume => {
70             code => sub { $_[2]->resume_processing(); return; },
71             synopsis => 'resume',
72             help => ' Resume the processing of waiting tasks from the TaskQueue.',
73             },
74             unqueue => {
75             code => \&unqueue_tasks,
76             synopsis => 'unqueue {taskid} ...',
77             help => ' Removes the tasks identified by taskids from the TaskQueue.',
78             },
79             schedule => {
80             code => \&schedule_tasks,
81             synopsis => 'schedule [at {time}] "cmd string" ... | schedule after {seconds} "cmd string" ...',
82             help => ' Schedule the specified commands for later execution. If the "at"
83             subcommand is used, the next arguemnt is expected to be a UNIX epoch time for the
84             command to be queued. The "after" subcommand specified a delay in seconds after
85             which the command is queued.',
86             },
87             unschedule => {
88             code => \&unschedule_tasks,
89             synopsis => 'unschedule {taskid} ...',
90             help => ' Removes the tasks identified by taskids from the TaskQueue Scheduler.',
91             },
92             list => {
93             code => \&list_tasks,
94             synopsis => 'list [verbose] [active|deferred|waiting|scheduled]',
95             help => ' List current outstanding tasks. With the verbose flag, list more
96             information on each task. Specify the specific subset of tasks to limit output.',
97             },
98             find => {
99             code => \&find_task,
100             synopsis => 'find task {taskid} | find command {text}',
101             help => ' Find a task in the queue by either task ID or a portion of the command
102             string.',
103             },
104             plugins => {
105             code => \&list_plugins,
106             synopsis => 'plugins [verbose]',
107             help => ' List the names of the plugins that have been loaded.',
108             },
109             commands => {
110             code => \&list_commands,
111             synopsis => 'commands [modulename]',
112             help => ' List the commands that are currently supported by the loaded plugins.
113             If a module name is supplied, only the commands from that plugin are displayed.',
114             },
115             status => {
116             code => \&queue_status,
117             synopsis => 'status',
118             help => ' Print the status of the Task Queue and Scheduler.',
119             },
120             convert => {
121             code => \&convert_state_files,
122             synopsis => 'convert {newformat}',
123             help => ' Convert the TaskQueue and Scheduler state files from the current format
124             to the newly specified format. Valid strings for the format are "storable" or
125             "yaml".'
126             },
127             info => {
128             code => \&display_queue_info,
129             synopsis => 'info',
130             help => ' Display current information about the TaskQueue, Scheduler, and the Ctrl
131             object.',
132             },
133             process => {
134             code => \&process_one_step,
135             synopsis => 'process [verbose] [scheduled|waiting]',
136             help => ' Process the requested queue items. If called with the "waiting" argument,
137             one waiting task is started if we have space in the active queue. If called with the
138             "scheduled" argument, any scheduled items that have reached their activation time will be
139             queued. Otherwise, both actions will be performed. Use the "verbose" flag for more output.'
140             },
141             );
142              
143             sub new {
144 4     4 1 1763 my ( $class, $args ) = @_;
145              
146 4 50       13 $args = {} unless defined $args;
147 4 100       16 die "Argument to new is not a hashref.\n" unless 'HASH' eq ref $args;
148 3         7 foreach my $arg (@required) {
149 5 100 66     37 die "Missing required '$arg' argument.\n" unless defined $args->{$arg} and length $args->{$arg};
150             }
151 1         3 my $self = {};
152 1         3 foreach my $arg ( keys %{$args} ) {
  1         5  
153 3 50       9 next unless exists $validate{$arg};
154 3 50       11 die "Value of '$arg' parameter ($args->{$arg}) is not valid.\n"
155             unless $validate{$arg}->( $args->{$arg} );
156 3         8 $self->{$arg} = $args->{$arg};
157             }
158 1 50 0     6 $self->{sdir} ||= $self->{qdir} if $self->{sname};
159 1   50     5 $self->{out} ||= \*STDOUT;
160              
161 1         6 return bless $self, $class;
162             }
163              
164             sub run {
165 0     0 1 0 my ( $self, $cmd, @args ) = @_;
166 0 0       0 die "No command supplied to run.\n" unless $cmd;
167 0 0       0 die "Unrecognized command '$cmd' to run.\n" unless exists $commands{$cmd};
168              
169 0         0 $commands{$cmd}->{code}->( $self, $self->{out}, $self->_get_queue(), $self->_get_scheduler(), @args );
170             }
171              
172             sub synopsis {
173 15     15 1 13596 my ( $self, $cmd ) = @_;
174              
175 15 100 66     110 if ( $cmd && exists $commands{$cmd} ) {
176 14         58 return $commands{$cmd}->{'synopsis'}, '';
177             }
178 1         12 return map { $commands{$_}->{'synopsis'}, '' } sort keys %commands;
  14         29  
179             }
180              
181             sub help {
182 15     15 1 9452 my ( $self, $cmd ) = @_;
183 15 100 66     81 if ( $cmd && exists $commands{$cmd} ) {
184 14         17 return @{ $commands{$cmd} }{ 'synopsis', 'help' }, '';
  14         75  
185             }
186 1         7 return map { @{ $commands{$_} }{ 'synopsis', 'help' }, '' } sort keys %commands;
  14         14  
  14         31  
187             }
188              
189             sub _get_queue {
190 0     0     my ($self) = @_;
191 0 0         return cPanel::TaskQueue->new(
    0          
192             {
193             name => $self->{qname},
194             state_dir => $self->{qdir},
195             ( exists $self->{logger} ? ( logger => $self->{logger} ) : () ),
196             ( defined $self->{serial} ? ( serial => $format{ lc $self->{serial} } ) : () ),
197             }
198             );
199             }
200              
201             sub _get_scheduler {
202 0     0     my ($self) = @_;
203              
204             # Explicitly returning undef because should only be called in scalar context.
205             # I want it to either return a scheduler or undef, returning an empty list
206             # never makes sense in this situation.
207 0 0         return undef unless exists $self->{sdir}; ## no critic (ProhibitExplicitReturnUndef)
208 0 0         return cPanel::TaskQueue::Scheduler->new(
    0          
209             {
210             name => $self->{sname},
211             state_dir => $self->{sdir},
212             ( exists $self->{logger} ? ( logger => $self->{logger} ) : () ),
213             ( defined $self->{serial} ? ( serial => $format{ lc $self->{serial} } ) : () ),
214             }
215             );
216             }
217              
218             sub queue_tasks {
219 0     0 1   my ( $ctrl, $fh, $queue, $sched, @cmds ) = @_;
220 0 0         die "No command to queue.\n" unless @cmds;
221              
222 0           foreach my $cmdstring (@cmds) {
223             eval {
224 0           print $fh "Id: ", $queue->queue_task($cmdstring), "\n";
225 0           1;
226 0 0         } or do {
227 0           print $fh "ERROR: $@\n";
228             };
229             }
230 0           return;
231             }
232              
233             sub unqueue_tasks {
234 0     0 1   my ( $ctrl, $fh, $queue, $sched, @tids ) = @_;
235 0 0         die "No task ids to unqueue.\n" unless @tids;
236              
237 0           my $count = 0;
238 0           foreach my $id (@tids) {
239             eval {
240 0 0         ++$count if $queue->unqueue_task($id);
241 0           1;
242 0 0         } or do {
243 0           print $fh "ERROR: $@\n";
244             };
245             }
246 0           print $fh "$count tasks unqueued\n";
247 0           return;
248             }
249              
250             sub schedule_tasks {
251 0     0 1   my ( $ctrl, $fh, $queue, $sched, $subcmd, @cmds ) = @_;
252 0 0         die "No command to schedule.\n" unless defined $subcmd;
253              
254 0           my $args = {};
255 0 0         if ( $subcmd eq 'at' ) {
    0          
256 0           $args->{'at_time'} = shift @cmds;
257             }
258             elsif ( $subcmd eq 'after' ) {
259 0           $args->{'delay_seconds'} = shift @cmds;
260             }
261             else {
262 0           unshift @cmds, $subcmd;
263             }
264              
265 0 0         die "No command to schedule.\n" unless @cmds;
266 0           foreach my $cmdstring (@cmds) {
267             eval {
268 0           print $fh "Id: ", $sched->schedule_task( $cmdstring, $args ), "\n";
269 0           1;
270 0 0         } or do { print $fh "ERROR: $@\n"; };
  0            
271             }
272 0           return;
273             }
274              
275             sub unschedule_tasks {
276 0     0 1   my ( $ctrl, $fh, $queue, $sched, @tids ) = @_;
277 0 0         die "No task ids to unschedule.\n" unless @tids;
278              
279 0           my $count = 0;
280 0           foreach my $id (@tids) {
281             eval {
282 0 0         ++$count if $sched->unschedule_task($id);
283 0           1;
284 0 0         } or do {
285 0           print $fh "ERROR: $@\n";
286             };
287             }
288 0           print $fh "$count tasks unscheduled\n";
289 0           return;
290             }
291              
292             sub _any_is {
293 0     0     my $match = shift;
294 0 0         return unless defined $match;
295 0           foreach (@_) {
296 0 0         return 1 if $match eq $_;
297             }
298 0           return;
299             }
300              
301             sub find_task {
302 0     0 1   my ( $ctrl, $fh, $queue, $sched, $subcmd, $match ) = @_;
303              
304 0 0         if ( !defined $match ) {
305 0           print $fh "No matching criterion.\n";
306 0           return;
307             }
308              
309 0           my @t;
310 0 0         if ( $subcmd eq 'task' ) {
    0          
311 0           @t = $queue->find_task($match);
312             }
313             elsif ( $subcmd eq 'command' ) {
314 0           @t = $queue->find_commands($match);
315             }
316             else {
317 0           print $fh "'$subcmd' is not a valid find type.\n";
318 0           return;
319             }
320 0 0         if (@t) {
321 0           foreach (@t) {
322 0           _verbosely_print_task( $fh, $_ );
323 0           print $fh "\n";
324             }
325             }
326             else {
327 0           print $fh "No matching task found.\n";
328             }
329 0           return;
330             }
331              
332             sub list_tasks {
333 0     0 1   my ( $ctrl, $fh, $queue, $sched, @subcmds ) = @_;
334 0           my $print = \&_print_task;
335 0 0         if ( _any_is( 'verbose', @subcmds ) ) {
336 0           $print = \&_verbosely_print_task;
337 0           @subcmds = grep { $_ ne 'verbose' } @subcmds;
  0            
338             }
339              
340 0 0         @subcmds = qw/active deferred waiting scheduled/ unless @subcmds;
341 0           my $lists = $queue->snapshot_task_lists;
342              
343 0 0         if ( _any_is( 'active', @subcmds ) ) {
344 0           print $fh "Active Tasks\n-------------\n";
345 0 0         if ( @{ $lists->{'processing'} } ) {
  0            
346 0           foreach my $t ( @{ $lists->{'processing'} } ) {
  0            
347 0           $print->( $fh, $t );
348             }
349             }
350             }
351              
352 0 0         if ( _any_is( 'deferred', @subcmds ) ) {
353 0           print $fh "Deferred Tasks\n-------------\n";
354 0 0         if ( @{ $lists->{'deferred'} } ) {
  0            
355 0           foreach my $t ( @{ $lists->{'deferred'} } ) {
  0            
356 0           $print->( $fh, $t );
357 0           print $fh "\n";
358             }
359             }
360             }
361              
362 0 0         if ( _any_is( 'waiting', @subcmds ) ) {
363 0           print $fh "Waiting Tasks\n-------------\n";
364 0 0         if ( @{ $lists->{'waiting'} } ) {
  0            
365 0           foreach my $t ( @{ $lists->{'waiting'} } ) {
  0            
366 0           $print->( $fh, $t );
367 0           print $fh "\n";
368             }
369             }
370             }
371              
372 0 0         return unless $sched;
373 0 0         if ( _any_is( 'scheduled', @subcmds ) ) {
374 0           my $sched_tasks = $sched->snapshot_task_schedule();
375 0           print $fh "Scheduled Tasks\n---------------\n";
376 0 0         if ( @{$sched_tasks} ) {
  0            
377 0           foreach my $st ( @{$sched_tasks} ) {
  0            
378 0           $print->( $fh, $st->{task} );
379 0           print $fh "\tScheduled for: ", scalar( localtime $st->{time} ), "\n";
380 0           print $fh "\n";
381             }
382             }
383             }
384 0           return;
385             }
386              
387             sub list_plugins {
388 0     0 1   my ( $ctrl, $fh, $queue, $sched, $verbosity ) = @_;
389              
390 0 0 0       if ( defined $verbosity && $verbosity eq 'verbose' ) {
391 0           my $plugins = cPanel::TaskQueue::PluginManager::get_plugins_hash();
392 0           foreach my $plug ( sort keys %{$plugins} ) {
  0            
393 0           print $fh "* $plug\n\t", join( "\n\t", map { "- $_" } sort @{ $plugins->{$plug} } ), "\n\n";
  0            
  0            
394             }
395             }
396             else {
397 0           print $fh join( "\n", map { "* $_" } cPanel::TaskQueue::PluginManager::list_loaded_plugins() ), "\n\n";
  0            
398             }
399 0           return;
400             }
401              
402             sub list_commands {
403 0     0 1   my ( $ctrl, $fh, $queue, $sched, $module ) = @_;
404              
405 0           my $plugins = cPanel::TaskQueue::PluginManager::get_plugins_hash();
406 0 0         if ( !defined $module ) {
    0          
407 0           my @commands = sort map { @{$_} } values %{$plugins};
  0            
  0            
  0            
408 0           print $fh join( "\n", ( map { "* $_" } @commands ) ), "\n\n";
  0            
409             }
410             elsif ( exists $plugins->{$module} ) {
411 0           my @commands = sort @{ $plugins->{$module} };
  0            
412 0           print $fh join( "\n", ( map { "* $_" } @commands ) ), "\n\n";
  0            
413             }
414             else {
415 0           print $fh "No module named $module was loaded.\n";
416             }
417 0           return;
418             }
419              
420             sub queue_status {
421 0     0 1   my ( $ctrl, $fh, $queue, $sched ) = @_;
422              
423 0           print $fh "Queue:\n";
424 0           print $fh "\tQueue Name:\t", $queue->get_name, "\n";
425 0           print $fh "\tDef. Timeout:\t", $queue->get_default_timeout, "\n";
426 0           print $fh "\tMax Timeout:\t", $queue->get_max_timeout, "\n";
427 0           print $fh "\tMax # Running:\t", $queue->get_max_running, "\n";
428 0           print $fh "\tChild Timeout:\t", $queue->get_default_child_timeout, "\n";
429 0           print $fh "\tProcessing:\t", $queue->how_many_in_process, "\n";
430 0           print $fh "\tQueued:\t\t", $queue->how_many_queued, "\n";
431 0           print $fh "\tDeferred:\t", $queue->how_many_deferred, "\n";
432 0 0         print $fh "\tPaused:\t\t", ( $queue->is_paused() ? 'yes' : 'no' ), "\n";
433              
434 0 0         if ( defined $sched ) {
435 0           print $fh "Scheduler:\n";
436 0           print $fh "\tSchedule Name:\t", $sched->get_name, "\n";
437 0           print $fh "\tScheduled:\t", $sched->how_many_scheduled, "\n";
438 0           my $seconds = $sched->seconds_until_next_task;
439 0 0         print $fh "\tTime to next:\t$seconds\n" if defined $seconds;
440             }
441 0           print $fh "\n";
442 0           return;
443             }
444              
445             sub convert_state_files {
446 0     0 1   my ( $ctrl, $fh, $queue, $sched, $fmt ) = @_;
447              
448 0           $fmt = lc $fmt;
449 0 0         unless ( exists $format{$fmt} ) {
450 0           print $fh "'$fmt' is not a valid format.\n";
451 0           return;
452             }
453 0           my $new_serial = $format{$fmt};
454 0           eval "use $new_serial;";
455 0 0         die "Unable to load serializer module '$new_serial': $@" if $@;
456 0           _convert_a_state_file( $queue, $new_serial );
457 0           _convert_a_state_file( $sched, $new_serial );
458 0           print $fh "Since the format of the state files have changed, don't forget to change the serialization format in other programs.\n";
459 0           $ctrl->{serial} = $fmt;
460 0           return;
461             }
462              
463             sub _convert_a_state_file {
464 0     0     my ( $q, $new_serial ) = @_;
465              
466 0           my $curr_serial = $q->_serializer();
467 0 0         if ( $new_serial ne $curr_serial ) {
468 0           my $curr_state_file = $q->_state_file();
469 0           my $new_state_file = $new_serial->filename( substr( $curr_state_file, 0, rindex( $curr_state_file, '.' ) ) );
470 0 0         open my $ifh, '<', $curr_state_file or die "Unable to read '$curr_state_file': $!\n";
471 0 0         open my $ofh, '>', $new_state_file or die "Unable to write '$new_state_file': $!\n";
472 0           $new_serial->save( $ofh, $curr_serial->load($ifh) );
473 0           close $ofh;
474 0           close $ifh;
475 0           unlink "$curr_state_file.orig";
476 0           rename $curr_state_file, "$curr_state_file.orig";
477             }
478             }
479              
480             sub display_queue_info {
481 0     0 1   my ( $ctrl, $fh, $queue, $sched, @args ) = @_;
482 0           print $fh "Current TaskQueue Information\n";
483 0           print $fh "Serializer: $ctrl->{serial} ($format{lc $ctrl->{serial}})\n";
484 0           print $fh "TaskQueue file: ", $queue->_state_file(), "\n";
485 0           print $fh "Scheduler file: ", $sched->_state_file(), "\n";
486 0           return;
487             }
488              
489             sub process_one_step {
490 0     0 1   my ( $ctrl, $fh, $queue, $sched, @args ) = @_;
491 0           my $argcnt = @args;
492 0           @args = grep { 'verbose' ne $_ } @args;
  0            
493 0           my $verbose = $argcnt > @args;
494 0 0         @args = qw/scheduled waiting/ unless grep { 'scheduled' eq $_ or 'waiting' eq $_ } @args;
  0 0          
495             eval {
496 0 0         if ( _any_is( 'scheduled', @args ) ) {
497 0           my $cnt = $sched->process_ready_tasks($queue);
498 0 0         if ($cnt) {
499 0 0         print $fh "$cnt scheduled tasks moved to queue.\n" if $verbose;
500             }
501             else {
502 0 0         print $fh "No scheduled tasks ready to queue.\n" if $verbose;
503             }
504             }
505 0 0         if ( _any_is( 'waiting', @args ) ) {
506 0 0         if ( $queue->has_work_to_do() ) {
507 0           $queue->process_next_task();
508 0 0         print "Activated a queued task.\n" if $verbose;
509             }
510             else {
511 0 0         print "No work to do at this time.\n" if $verbose;
512             }
513             }
514 0           1;
515 0 0         } or do {
516 0           print $fh "Exception detected: $@";
517             };
518             }
519              
520             sub _print_task {
521 0     0     my ( $fh, $task ) = @_;
522 0           print $fh '[', $task->uuid, ']: ', $task->full_command, "\n";
523 0           print $fh "\tQueued: ", scalar( localtime $task->timestamp ), "\n";
524 0 0         print $fh "\tStarted: ", scalar( localtime $task->started ), "\n" if defined $task->started;
525             }
526              
527             sub _verbosely_print_task {
528 0     0     my ( $fh, $task ) = @_;
529 0           print $fh '[', $task->uuid, ']: ', $task->full_command, "\n";
530 0           print $fh "\tQueued: ", scalar( localtime $task->timestamp ), "\n";
531 0 0         print $fh "\tStarted: ", ( defined $task->started ? scalar( localtime $task->started ) : 'N/A' ), "\n";
532 0           print $fh "\tChild Timeout: ", $task->child_timeout, " secs\n";
533 0   0       print $fh "\tPID: ", ( $task->pid || 'None' ), "\n";
534 0           print $fh "\tRemaining Retries: ", $task->retries_remaining, "\n";
535             }
536              
537             1;
538              
539             __END__