File Coverage

blib/lib/Metabrik/Worker/Parallel.pm
Criterion Covered Total %
statement 9 49 18.3
branch 0 14 0.0
condition 0 3 0.0
subroutine 3 9 33.3
pod 2 6 33.3
total 14 81 17.2


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # worker::parallel Brik
5             #
6             package Metabrik::Worker::Parallel;
7 1     1   1058 use strict;
  1         3  
  1         30  
8 1     1   5 use warnings;
  1         2  
  1         40  
9              
10 1     1   7 use base qw(Metabrik);
  1         7  
  1         503  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             pool_size => [ qw(count) ],
20             pid => [ qw(pid) ], # pid is available within son process only
21             manager => [ qw(INTERNAL) ],
22             },
23             attributes_default => {
24             pool_size => 10,
25             },
26             commands => {
27             create_manager => [ qw(pool_size|OPTIONAL) ],
28             reset_manager => [ ],
29             start => [ qw(sub) ],
30             stop => [ ],
31             },
32             require_modules => {
33             'Parallel::ForkManager' => [ ],
34             },
35             };
36             }
37              
38             sub create_manager {
39 0     0 0   my $self = shift;
40 0           my ($pool_size) = @_;
41              
42             # Do not create another manager if one already exists.
43 0           my $manager = $self->manager;
44 0 0         if (defined($manager)) {
45 0           return 1;
46             }
47              
48 0   0       $pool_size ||= $self->pool_size;
49              
50 0           $manager = Parallel::ForkManager->new(
51             $pool_size,
52             );
53              
54 0           $self->manager($manager);
55              
56 0           return 1;
57             }
58              
59             sub reset_manager {
60 0     0 0   my $self = shift;
61              
62 0           my $manager = $self->manager;
63 0 0         if (! defined($manager)) {
64 0           return 1;
65             }
66              
67 0           $manager->wait_all_children;
68 0           $self->manager(undef);
69              
70 0           return 1;
71             }
72              
73             sub start {
74 0     0 0   my $self = shift;
75 0           my ($sub) = @_;
76              
77 0 0         $self->brik_help_run_undef_arg('start', $sub) or return;
78 0 0         my $ref = $self->brik_help_run_invalid_arg('start', $sub, 'CODE')
79             or return;
80              
81 0 0         $self->create_manager or return;
82 0           my $manager = $self->manager;
83              
84 0           my $pid = $manager->start;
85 0 0         if ($pid) { # Success, return to parent
86             # This is the child pid, to be used by parent:
87 0           $self->pid($pid);
88 0           return $pid;
89             }
90              
91             # Continue within son
92 0           $self->pid($$); # This pid is the child pid, gathered from within child
93 0           &{$sub}();
  0            
94              
95 0           $manager->finish;
96              
97 0           return 0;
98             }
99              
100             sub stop {
101 0     0 0   my $self = shift;
102              
103 0           my $manager = $self->manager;
104 0 0         if (defined($manager)) {
105 0           $manager->wait_all_children;
106             }
107              
108 0           return 1;
109             }
110              
111             sub brik_fini {
112 0     0 1   my $self = shift;
113              
114 0           $self->reset_manager;
115              
116 0           return $self->SUPER::brik_fini;
117             }
118              
119             1;
120              
121             __END__