File Coverage

blib/lib/MapReduce/Framework/Simple.pm
Criterion Covered Total %
statement 69 217 31.8
branch 11 60 18.3
condition 2 13 15.3
subroutine 12 20 60.0
pod 4 4 100.0
total 98 314 31.2


line stmt bran cond sub pod time code
1             package MapReduce::Framework::Simple;
2 6     6   69898 use 5.010001;
  6         24  
3 6     6   40 use strict;
  6         17  
  6         160  
4 6     6   34 use warnings;
  6         13  
  6         138  
5 6     6   29 use B::Deparse;
  6         21  
  6         135  
6 6     6   2280 use Mouse;
  6         127501  
  6         24  
7 6     6   4180 use Data::MessagePack;
  6         4965  
  6         146  
8 6     6   2721 use Parallel::ForkManager;
  6         159661  
  6         178  
9 6     6   2394 use Plack::Request;
  6         258057  
  6         176  
10 6     6   3588 use WWW::Mechanize;
  6         543097  
  6         270  
11 6     6   90 use List::Util qw(shuffle);
  6         13  
  6         10192  
12              
13             our $VERSION = "0.07";
14              
15             has 'verify_hostname' => (is => 'rw', isa => 'Int', default => 1);
16             has 'skip_undef_result' => (is => 'rw', isa => 'Int', default => 1);
17             has 'warn_discarded_data' => (is => 'rw', isa => 'Int', default => 1);
18             has 'die_discarded_data' => (is => 'rw', isa => 'Int', default => 0);
19             has 'worker_log' => (is => 'rw', isa => 'Int', default => 0);
20             has 'force_plackup' => (is => 'rw', isa => 'Int', default => 0);
21              
22             # To make load balanced data.
23             sub create_assigned_data {
24 0     0 1 0 my $self = shift;
25 0         0 my $data = shift;
26 0         0 my $servers = shift;
27 0         0 my $options = shift;
28 0         0 my $chunk_num = 10;
29 0         0 my $method = 'volume_uniform';
30 0 0       0 if(defined($options)){
31 0 0       0 if(defined($options->{chunk_num})){
32 0         0 $chunk_num = $options->{chunk_num};
33             }
34 0 0       0 if(defined($options->{method})){
35 0         0 $method = $options->{method};
36             }
37             }
38 0         0 my $output;
39 0 0       0 if($method eq 'element_shuffle'){
    0          
40              
41 0         0 @$data = shuffle(@$data);
42 0         0 for(0 .. $#$data){
43 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
44             }
45 0         0 for(0 .. $#$output){
46 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
47             }
48             }elsif($method eq 'element_sequential'){
49 0         0 for(0 .. $#$data){
50 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
51             }
52 0         0 for(0 .. $#$output){
53 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
54             }
55             }else{
56 0         0 my $mp = Data::MessagePack->new();
57             @$data = map $_->[0],
58 0         0 sort {$a->[1] <=> $b->[1]} map [$_, bytes::length $mp->pack($_)],
  0         0  
59             @$data;
60 0         0 for(0 .. $#$data){
61 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
62             }
63 0         0 for(0 .. $#$output){
64 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
65             }
66             }
67 0         0 return($output);
68             }
69              
70             # MapReduce client(Master)
71             sub map_reduce {
72 5     5 1 29845 my $self = shift;
73 5         20 my $data = shift;
74 5         15 my $mapper_ref = shift;
75 5         20 my $reducer_ref = shift;
76 5         15 my $max_proc = shift;
77 5         20 my $options = shift;
78 5         10 my $remote_flg = 1;
79 5 50 33     50 if(defined($options) and defined($options->{remote})){
80 5         20 $remote_flg = $options->{remote};
81             }
82 5         17080 my $stringified_code = B::Deparse->new->coderef2text($mapper_ref);
83 5         65 my $result;
84             my $succeeded_remotes;
85 5         0 my $failed_remotes;
86 5         0 my $failed_data;
87 5         0 my $discarded_data;
88 5         60 my $pm = Parallel::ForkManager->new($max_proc);
89             $pm->run_on_finish(
90             sub {
91 4     4   1002949 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
92 4 50       28 if (defined $data_structure) {
93 4 50       20 if($data_structure->{is_success} == 1){
94 4         15 $succeeded_remotes->{$data_structure->{remote}} = 1;
95 4         21 $result->[$data_structure->{id}] = $data_structure->{result};
96             }else{
97 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
98 0         0 push(@$failed_data,$data_structure->{failed_data});
99 0         0 $result->[$data_structure->{id}] = undef;
100             }
101             }
102             }
103 5         3070 );
104 5 50       75 if($remote_flg == 1){
105 0         0 for(my $k=0; $k <= $#$data; $k++){
106 0 0       0 $pm->start and next;
107 0         0 my $payload = _perl_to_msgpack(
108             {
109             data => $data->[$k]->[0],
110             code => $stringified_code
111             }
112             );
113 0         0 my $result_chil_from_remote = _post_content(
114             $data->[$k]->[1],
115             'application/x-msgpack; charset=x-user-defined',
116             $payload,
117             $self->verify_hostname
118             );
119 0         0 my $result_with_id;
120 0 0       0 if($result_chil_from_remote->{is_success}){
121 0         0 my $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
122 0         0 $result_with_id = {id => $k, result => $result_chil->{result}, remote => $data->[$k]->[1], is_success => 1};
123             }else{
124 0         0 $result_with_id = {id => $k, remote => $data->[$k]->[0], is_success => 0, failed_data => $data->[$k]};
125             }
126 0         0 $pm->finish(0,$result_with_id);
127             }
128             }else{
129 5         30 for(my $k=0; $k <= $#$data; $k++){
130 14 100       9848 $pm->start and next;
131 4         4493 my $result_chil = $mapper_ref->($data->[$k]);
132 4         5257 my $result_with_id = {id => $k, result => $result_chil, is_success => 1, remote => 'LOCAL'};
133 4         127 $pm->finish(0,$result_with_id);
134             }
135             }
136 1         1088 $pm->wait_all_children;
137 1         10 my $result_failover;
138 1 50 33     26 if($remote_flg == 1 and $#$failed_data >= 0){
139 0         0 my @succeeded_remotes_list;
140 0         0 foreach my $key (keys %$succeeded_remotes){
141 0         0 push(@succeeded_remotes_list,$key);
142             }
143 0         0 my $pm2 = Parallel::ForkManager->new($max_proc);
144             $pm2->run_on_finish(
145             sub {
146 0     0   0 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
147 0 0       0 if (defined $data_structure) {
148 0 0       0 if($data_structure->{is_success} == 1){
149 0         0 $succeeded_remotes->{$data_structure->{remote}} = 1;
150 0         0 $result_failover->[$data_structure->{id}] = $data_structure->{result};
151             }else{
152 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
153 0         0 push(@$discarded_data,$data_structure->{failed_data});
154 0         0 $result_failover->[$data_structure->{id}] = undef;
155             }
156             }
157             }
158 0         0 );
159              
160 0         0 for(my $k=0; $k <= $#$failed_data; $k++){
161 0 0       0 $pm2->start and next;
162 0         0 my $payload = _perl_to_msgpack(
163             {
164             data => $failed_data->[$k]->[0],
165             code => $stringified_code
166             }
167             );
168 0         0 my $rand_remote = $succeeded_remotes_list[int(rand($#succeeded_remotes_list))];
169 0         0 my $result_chil_from_remote = _post_content(
170             $rand_remote,
171             'application/x-msgpack; charset=x-user-defined',
172             $payload,
173             $self->verify_hostname
174             );
175 0         0 my $result_with_id;
176 0 0       0 if($result_chil_from_remote->{is_success}){
177 0         0 my $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
178 0         0 $result_with_id = {id => $#$data + $k, result => $result_chil->{result}, remote => $rand_remote, is_success => 1};
179             }else{
180 0         0 $result_with_id = {id => $#$data + $k, remote => $rand_remote, is_success => 0, failed_data => $failed_data->[$k]};
181             }
182 0         0 $pm2->finish(0,$result_with_id);
183             }
184 0         0 $pm2->wait_all_children;
185             }
186 1         3 my $result_merged;
187 1         4 push(@$result_merged,@$result);
188 1 50       5 if($#$result_failover >= 0){
189 0         0 push(@$result_merged,@$result_failover);
190             }
191 1 50       7 if($#$discarded_data >= 0){
192 0 0       0 if($self->die_discarded_data == 1){
    0          
193 0         0 die "Fatal: Discarded data exist due to remote server couldn't process requested data.\n";
194             }elsif($self->warn_discarded_data == 1){
195 0         0 warn "Warning: Discarded data exist.\n";
196             }
197             }
198 1 50       12 if($self->skip_undef_result == 1){
199 1         4 my $result_skip;
200 1         6 for(0 .. $#$result_merged){
201 4 50       13 if(defined($result_merged->[$_])){
202 4         10 push(@$result_skip,$result_merged->[$_]);
203             }
204             }
205 1         9 return($reducer_ref->($result_skip));
206             }else{
207 0           return($reducer_ref->($result_merged));
208             }
209             }
210              
211             sub worker {
212 0     0 1   my $self = shift;
213 0           my $path = shift;
214 0           my $worker = shift;
215 0           my $port = shift;
216 0 0         unless(defined($worker)){
217 0           $worker = 4;
218             }
219 0 0         unless(defined($port)){
220 0           $port = 5000;
221             }
222 0           my $rc = eval{
223 0           require Plack::Handler::Starlet;
224 0           1;
225             };
226 0 0 0       if($rc and $self->force_plackup == 0){
227 0           print "Starting MapReduce Framework Worker by Starlet\n";
228 0           print "Path: $path\nPort: $port\n";
229 0           my $app = $self->load_worker_plack_app($path);
230 0           my $handler = Plack::Handler::Starlet->new(
231             max_workers => $worker,
232             port => $port
233             );
234 0           $handler->run($app);
235             }else{
236 0           require Plack::Runner;
237 0           my $runner = Plack::Runner->new;
238 0           print "Starting MapReduce Framework Worker by plackup. The number of workers will be ignored.\n";
239 0           print "Path: $path\nPort: $port\n";
240 0           my $app = $self->load_worker_plack_app($path);
241 0           $runner->parse_options('--port',$port);
242 0           $runner->run($app);
243             }
244             }
245              
246             sub load_worker_plack_app {
247 0     0 1   my $self = shift;
248 0           my $path = shift;
249             my $app = sub {
250 0     0     my $env = shift;
251 0           my $req = Plack::Request->new($env);
252 0 0         if($self->worker_log == 1){
253 0           print "REQ,$$,".$req->address.',';
254 0           my @tar = localtime(time());
255 0           printf(
256             "%04d-%02d-%02d %02d:%02d:%02d",
257             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
258             );
259 0           print "\n";
260             }
261             my $response = {
262             $path => sub {
263 0   0       my $msg_req = $req->content //
264             return [400,['Content-Type' => 'text/plain'],['Content body required.']];
265 0   0       my $perl_req = _msgpack_to_perl($msg_req) //
266             return [400,['Content-Type' => 'text/plain'],['Valid MessagePack required']];
267 0           my $data = $perl_req->{data};
268 0           my $code_text = $perl_req->{code};
269 0           my $code_ref;
270 0           eval('$code_ref = sub '.$code_text.';');
271 0           my $result = $code_ref->($data);
272 0           return [200,['Content-Type' => 'application/x-msgpack; charset=x-user-defined'],[_perl_to_msgpack({result => $result})]];
273             }
274 0           };
275 0 0         if($self->worker_log == 1){
276 0           print "END,$$,".$req->address.',';
277 0           my @tar = localtime(time());
278 0           printf(
279             "%04d-%02d-%02d %02d:%02d:%02d",
280             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
281             );
282 0           print "\n";
283             }
284 0 0         if(defined($response->{$env->{PATH_INFO}})){
285 0           return $response->{$env->{PATH_INFO}}->();
286             }else{
287 0           return [404,['Content-Type' => 'text/plain'],['Not Found']];
288             }
289              
290 0           };
291 0           return($app);
292             }
293              
294              
295             sub _post_content {
296 0     0     my $url = shift;
297 0           my $content_type = shift;
298 0           my $data = shift;
299 0           my $ssl_opt = shift;
300 0           my $ua = WWW::Mechanize->new(
301             ssl_opts => {
302             verify_hostname => $ssl_opt
303             }
304             );
305 0           my $is_success = 1;
306 0           eval{
307 0           $ua->post($url,'Content-Type' => $content_type, Content => $data);
308             };
309 0 0         if($@){
310 0           $is_success = 0;
311             }
312 0           my $res = $ua->content();
313 0           return {res => $res, is_success => $is_success};
314             }
315              
316             sub _perl_to_msgpack {
317 0     0     my $data = shift;
318 0           my $msgpack = Data::MessagePack->new();
319 0           my $packed = $msgpack->pack($data);
320 0           return($packed);
321             }
322              
323             sub _msgpack_to_perl {
324 0     0     my $msg_text = shift;
325 0           my $msgpack = Data::MessagePack->new();
326 0           my $unpacked = $msgpack->unpack($msg_text);
327 0           return($unpacked);
328             }
329              
330              
331              
332             __PACKAGE__->meta->make_immutable();
333              
334             1;
335             __END__