File Coverage

blib/lib/Job/Manager.pm
Criterion Covered Total %
statement 11 13 84.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 16 18 88.8


line stmt bran cond sub pod time code
1             package Job::Manager;
2             {
3             $Job::Manager::VERSION = '0.16';
4             }
5             BEGIN {
6 1     1   30903 $Job::Manager::AUTHORITY = 'cpan:TEX';
7             }
8             # ABSTRACT: a parallel job execution manager
9              
10 1     1   32 use 5.010_000;
  1         4  
  1         34  
11 1     1   901 use mro 'c3';
  1         822  
  1         8  
12 1     1   45 use feature ':5.10';
  1         2  
  1         112  
13              
14 1     1   494 use Moose;
  0            
  0            
15             use namespace::autoclean;
16              
17             # use IO::Handle;
18             # use autodie;
19             # use MooseX::Params::Validate;
20              
21             use Sys::ForkQueue;
22              
23             has 'concurrency' => (
24             'is' => 'rw',
25             'isa' => 'Int',
26             'required' => 1,
27             );
28              
29             has 'jobs' => (
30             'is' => 'ro',
31             'isa' => 'ArrayRef[Job::Manager::Job]',
32             'default' => sub { [] },
33             );
34              
35             with qw(Log::Tree::RequiredLogger);
36              
37             sub add {
38             my $self = shift;
39             my $job = shift;
40              
41             if ( $job && ref($job) && $job->isa('Job::Manager::Job') ) {
42             push( @{ $self->jobs() }, $job );
43             return 1;
44             }
45             else {
46             $self->logger()->log( message => 'Job is not a subtype of Job::Manager::Job but ' . ref($job) . '. Can not add it.', level => 'warning', );
47             return;
48             }
49             }
50              
51             sub add_batch {
52             my ( $self, @jobs ) = @_;
53              
54             my $i = 0;
55             foreach my $job (@jobs) {
56             $self->add($job) and $i++;
57             }
58             return $i;
59             }
60              
61             # run a single job
62             sub _exec {
63             my $self = shift;
64             my $num = shift;
65              
66             return unless defined($num);
67              
68             $self->logger()->log( message => 'Running Job #' . $num, level => 'debug', );
69              
70             if ( $self->jobs() && $self->jobs()->[$num] ) {
71             $self->logger()->log( message => 'Running Job #' . $num, level => 'debug', );
72              
73             # detach any ressources this job may have shared w/ the parent
74             # this are e.g. filehandles, dbhandles or logfiles
75             $self->jobs()->[$num]->forked();
76             return $self->jobs()->[$num]->run();
77             }
78             else {
79             $self->logger()->log( message => 'Job #' . $num . ' not found.', level => 'warning', );
80             return;
81             }
82             }
83              
84             sub run {
85             my $self = shift;
86              
87             # each job needs a unique name, just use montonous increasing numbers
88             my $i = 0;
89             my @jobs = map { $i++ } @{ $self->jobs() };
90             my $FQ = Sys::ForkQueue::->new(
91             {
92             'jobs' => \@jobs,
93             'code' => sub { $self->_exec(@_); },
94             'logger' => $self->logger(),
95             'concurrency' => $self->concurrency(),
96             'redirect_output' => 0,
97             },
98             );
99             return $FQ->run();
100             }
101              
102             no Moose;
103             __PACKAGE__->meta->make_immutable;
104              
105             1;
106              
107             __END__
108              
109             =pod
110              
111             =encoding utf-8
112              
113             =head1 NAME
114              
115             Job::Manager - a parallel job execution manager
116              
117             =head1 SYNOPSIS
118              
119             use Job::Manager;
120             use Job::Manager::Job;
121              
122             my $Mgm = Job::Manager::->new({
123             'logger' => $logger,
124             'concurrency' => '4',
125             });
126             foreach my $i ( 1 .. 60 ) {
127             my $Job = Job::Manager::Job::->new({
128             'logger' => $logger,
129             });
130             $Mgm->add($Job);
131             }
132             $Mgm->run();
133              
134             =head1 METHODS
135              
136             =head2 add
137              
138             Add a single job to the queue.
139              
140             =head2 add_batch
141              
142             Add a list of jobs to the queue.
143              
144             =head2 run
145              
146             Process the whole job queue.
147              
148             =head1 NAME
149              
150             Job::Manager - Parallel job execution manager.
151              
152             =head1 AUTHOR
153              
154             Dominik Schulz <dominik.schulz@gauner.org>
155              
156             =head1 COPYRIGHT AND LICENSE
157              
158             This software is copyright (c) 2012 by Dominik Schulz.
159              
160             This is free software; you can redistribute it and/or modify it under
161             the same terms as the Perl 5 programming language system itself.
162              
163             =cut