File Coverage

blib/lib/MapReduce/Framework/Simple.pm
Criterion Covered Total %
statement 85 335 25.3
branch 13 92 14.1
condition 3 20 15.0
subroutine 16 25 64.0
pod 4 5 80.0
total 121 477 25.3


line stmt bran cond sub pod time code
1             package MapReduce::Framework::Simple;
2 6     6   67151 use 5.010001;
  6         23  
3 6     6   30 use strict;
  6         17  
  6         90  
4 6     6   29 use warnings;
  6         12  
  6         124  
5 6     6   29 use B::Deparse;
  6         36  
  6         118  
6 6     6   2158 use Mouse;
  6         121411  
  6         25  
7 6     6   4530 use Data::MessagePack;
  6         5176  
  6         146  
8 6     6   2546 use Parallel::ForkManager;
  6         156964  
  6         197  
9 6     6   2411 use Plack::Request;
  6         257246  
  6         185  
10 6     6   3757 use WWW::Mechanize;
  6         538492  
  6         241  
11 6     6   72 use List::Util qw(shuffle);
  6         17  
  6         477  
12 6     6   2596 use Data::Serializer;
  6         12433  
  6         157  
13 6     6   42 use Storable;
  6         12  
  6         283  
14 6     6   29 no warnings qw(once);
  6         13  
  6         222  
15             $Storable::Deparse = 1;
16             $Storable::Eval = 1;
17 6     6   35 use warnings;
  6         12  
  6         14645  
18              
19             our $VERSION = "0.09";
20              
21             has 'verify_hostname' => (is => 'rw', isa => 'Int', default => 1);
22             has 'skip_undef_result' => (is => 'rw', isa => 'Int', default => 1);
23             has 'warn_discarded_data' => (is => 'rw', isa => 'Int', default => 1);
24             has 'die_discarded_data' => (is => 'rw', isa => 'Int', default => 0);
25             has 'worker_log' => (is => 'rw', isa => 'Int', default => 0);
26             has 'force_plackup' => (is => 'rw', isa => 'Int', default => 0);
27             has 'server_spec' => (is => 'rw', isa => 'HashRef', default => sub{{cores => 1, clock => 2000, mem => 1}});
28             has 'worker_num' => (is => 'rw', isa => 'Int', default => 1);
29             has 'port' => (is => 'rw', isa => 'Int', default => 5000);
30             has 'path' => (is => 'rw', isa => 'Str', default => '/eval');
31              
32              
33             # To make load balanced data.
34             sub create_assigned_data {
35 0     0 1 0 my $self = shift;
36 0         0 my $data = shift;
37 0         0 my $servers = shift;
38 0         0 my $options = shift;
39 0         0 my $chunk_num = 10;
40 0         0 my $method = 'volume_uniform';
41 0 0       0 if(defined($options)){
42 0 0       0 if(defined($options->{chunk_num})){
43 0         0 $chunk_num = $options->{chunk_num};
44             }
45 0 0       0 if(defined($options->{method})){
46 0         0 $method = $options->{method};
47             }
48             }
49 0         0 my $output;
50 0 0       0 if($method eq 'element_shuffle'){
    0          
    0          
    0          
    0          
51              
52 0         0 @$data = shuffle(@$data);
53 0         0 for(0 .. $#$data){
54 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
55             }
56 0         0 for(0 .. $#$output){
57 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
58             }
59             }elsif($method eq 'element_sequential'){
60 0         0 for(0 .. $#$data){
61 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
62             }
63 0         0 for(0 .. $#$output){
64 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
65             }
66             }elsif($method eq 'element_server_cores'){
67 0         0 @$data = shuffle(@$data);
68 0         0 my $server_spec = $self->check_server_spec($servers);
69 0         0 my $tmp_server_list;
70 0         0 foreach my $key (keys %$server_spec){
71 0         0 push(@$tmp_server_list,$key);
72 0         0 for(2 .. $server_spec->{$key}->{server_spec}->{cores}){
73 0         0 push(@$tmp_server_list,$key);
74             }
75             }
76 0         0 for(0 .. $#$data){
77 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
78             }
79 0         0 for(0 .. $#$output){
80 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
81             }
82             }elsif($method eq 'element_server_workers'){
83 0         0 @$data = shuffle(@$data);
84 0         0 my $server_spec = $self->check_server_spec($servers);
85 0         0 my $tmp_server_list;
86 0         0 foreach my $key (keys %$server_spec){
87 0         0 push(@$tmp_server_list,$key);
88 0         0 for(2 .. $server_spec->{$key}->{server_spec}->{worker_num}){
89 0         0 push(@$tmp_server_list,$key);
90             }
91             }
92 0         0 for(0 .. $#$data){
93 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
94             }
95 0         0 for(0 .. $#$output){
96 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
97             }
98             }elsif($method eq 'element_server_core_clock'){
99 0         0 @$data = shuffle(@$data);
100 0         0 my $server_spec = $self->check_server_spec($servers);
101 0         0 my $tmp_server_list;
102              
103 0         0 my $tmp_max_cc = 0;
104 0         0 my $tmp_min_cc = 1_000_000_000_000;
105 0         0 foreach my $key (keys %$server_spec){
106             my $tmp_cc = ($server_spec->{$key}->{server_spec}->{cores} + 0.0001)
107 0         0 * ($server_spec->{$key}->{server_spec}->{clock} + 0.0001);
108 0 0       0 if($tmp_max_cc < $tmp_cc){
109 0         0 $tmp_max_cc = $tmp_cc;
110             }
111 0 0       0 if($tmp_min_cc > $tmp_cc){
112 0         0 $tmp_min_cc = $tmp_cc;
113             }
114             }
115              
116 0         0 my $pre_ratio = ((log($tmp_max_cc) / log(10))+(log($tmp_max_cc) / log(10)));
117 0         0 my $ratio = 1/ 10 ** int($pre_ratio / 2);
118 0         0 foreach my $key (keys %$server_spec){
119 0         0 push(@$tmp_server_list,$key);
120             my $tmp_cc = $server_spec->{$key}->{server_spec}->{cores}
121 0         0 * $server_spec->{$key}->{server_spec}->{clock};
122 0         0 my $add_num = int($tmp_cc * $ratio);
123 0         0 for(2 .. $add_num){
124 0         0 push(@$tmp_server_list,$key);
125             }
126             }
127 0         0 for(0 .. $#$data){
128 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
129             }
130 0         0 for(0 .. $#$output){
131 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
132             }
133             }else{
134 0         0 my $mp = Data::MessagePack->new();
135             @$data = map $_->[0],
136 0         0 sort {$a->[1] <=> $b->[1]} map [$_, bytes::length $mp->pack($_)],
  0         0  
137             @$data;
138 0         0 for(0 .. $#$data){
139 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
140             }
141 0         0 for(0 .. $#$output){
142 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
143             }
144             }
145 0         0 return($output);
146             }
147              
148             # MapReduce client(Master)
149             sub map_reduce {
150 5     5 1 19315 my $self = shift;
151 5         10 my $data = shift;
152 5         10 my $mapper_ref = shift;
153 5         15 my $reducer_ref = shift;
154 5         25 my $max_proc = shift;
155 5         15 my $options = shift;
156 5         15 my $remote_flg = 1;
157 5 50 33     45 if(defined($options) and defined($options->{remote})){
158 5         10 $remote_flg = $options->{remote};
159             }
160 5         10 my $storable_flg = 0;
161 5 50 33     30 if(defined($options) and defined($options->{storable})){
162 0         0 $storable_flg = $options->{storable};
163             }
164 5         14965 my $stringified_code = B::Deparse->new->coderef2text($mapper_ref);
165 5         65 my $result;
166             my $succeeded_remotes;
167 5         0 my $failed_remotes;
168 5         0 my $failed_data;
169 5         0 my $discarded_data;
170 5         50 my $pm = Parallel::ForkManager->new($max_proc);
171              
172 5         2920 my $ds;
173 5 50       30 if($storable_flg == 1){
174 0         0 $ds = Data::Serializer->new(
175             serializer => 'Storable',
176             compress => 1
177             );
178             }
179              
180             $pm->run_on_finish(
181             sub {
182 4     4   1002563 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
183 4 50       20 if (defined $data_structure) {
184 4 50       16 if($data_structure->{is_success} == 1){
185 4         17 $succeeded_remotes->{$data_structure->{remote}} = 1;
186 4         16 $result->[$data_structure->{id}] = $data_structure->{result};
187             }else{
188 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
189 0         0 push(@$failed_data,$data_structure->{failed_data});
190 0         0 $result->[$data_structure->{id}] = undef;
191             }
192             }
193             }
194 5         50 );
195 5 50       70 if($remote_flg == 1){
196 0         0 for(my $k=0; $k <= $#$data; $k++){
197 0 0       0 $pm->start and next;
198 0         0 my $payload;
199 0 0       0 if($storable_flg == 1){
200 0         0 $payload = $ds->serialize(
201             {
202             data => $data->[$k]->[0],
203             code => $stringified_code
204             }
205             );
206             }else{
207 0         0 $payload = _perl_to_msgpack(
208             {
209             data => $data->[$k]->[0],
210             code => $stringified_code
211             }
212             );
213             }
214 0         0 my $path_add = '';
215 0 0       0 if($storable_flg == 1){ $path_add = '/STORABLE'; };
  0         0  
216 0         0 my $content_type_name = 'application/x-msgpack; charset=x-user-defined';
217 0 0       0 if($storable_flg == 1){ $content_type_name = 'application/octet-stream; charset=x-user-defined'; };
  0         0  
218 0         0 my $result_chil_from_remote = _post_content(
219             $data->[$k]->[1].$path_add,
220             $content_type_name,
221             $payload,
222             $self->verify_hostname
223             );
224 0         0 my $result_with_id;
225 0 0       0 if($result_chil_from_remote->{is_success}){
226 0         0 my $result_chil;
227 0 0       0 if($storable_flg == 1){
228 0         0 $result_chil = $ds->deserialize($result_chil_from_remote->{res});
229             }else{
230 0         0 $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
231             }
232 0         0 $result_with_id = {id => $k, result => $result_chil->{result}, remote => $data->[$k]->[1], is_success => 1};
233             }else{
234 0         0 $result_with_id = {id => $k, remote => $data->[$k]->[0], is_success => 0, failed_data => $data->[$k]};
235             }
236 0         0 $pm->finish(0,$result_with_id);
237             }
238             }else{
239 5         20 for(my $k=0; $k <= $#$data; $k++){
240 14 100       8070 $pm->start and next;
241 4         6102 my $result_chil = $mapper_ref->($data->[$k]);
242 4         4417 my $result_with_id = {id => $k, result => $result_chil, is_success => 1, remote => 'LOCAL'};
243 4         98 $pm->finish(0,$result_with_id);
244             }
245             }
246 1         779 $pm->wait_all_children;
247 1         14 my $result_failover;
248 1 50 33     8 if($remote_flg == 1 and $#$failed_data >= 0){
249 0         0 my @succeeded_remotes_list;
250 0         0 foreach my $key (keys %$succeeded_remotes){
251 0         0 push(@succeeded_remotes_list,$key);
252             }
253 0         0 my $pm2 = Parallel::ForkManager->new($max_proc);
254             $pm2->run_on_finish(
255             sub {
256 0     0   0 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
257 0 0       0 if (defined $data_structure) {
258 0 0       0 if($data_structure->{is_success} == 1){
259 0         0 $succeeded_remotes->{$data_structure->{remote}} = 1;
260 0         0 $result_failover->[$data_structure->{id}] = $data_structure->{result};
261             }else{
262 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
263 0         0 push(@$discarded_data,$data_structure->{failed_data});
264 0         0 $result_failover->[$data_structure->{id}] = undef;
265             }
266             }
267             }
268 0         0 );
269              
270 0         0 for(my $k=0; $k <= $#$failed_data; $k++){
271 0 0       0 $pm2->start and next;
272 0         0 my $payload;
273 0 0       0 if($storable_flg == 1){
274 0         0 $payload = $ds->serialize(
275             {
276             data => $failed_data->[$k]->[0],
277             code => $stringified_code
278             }
279             );
280             }else{
281 0         0 $payload = _perl_to_msgpack(
282             {
283             data => $failed_data->[$k]->[0],
284             code => $stringified_code
285             }
286             );
287             }
288              
289 0         0 my $path_add = '';
290 0 0       0 if($storable_flg == 1){ $path_add = '/STORABLE' };
  0         0  
291 0         0 my $content_type_name = 'application/x-msgpack; charset=x-user-defined';
292 0 0       0 if($storable_flg == 1){ $content_type_name = 'application/octet-stream; charset=x-user-defined'; };
  0         0  
293 0         0 my $rand_remote = $succeeded_remotes_list[int(rand($#succeeded_remotes_list))];
294 0         0 my $result_chil_from_remote = _post_content(
295             $rand_remote.$path_add,
296             $content_type_name,
297             $payload,
298             $self->verify_hostname
299             );
300 0         0 my $result_with_id;
301 0 0       0 if($result_chil_from_remote->{is_success}){
302 0         0 my $result_chil;
303 0 0       0 if($storable_flg == 1){
304 0         0 $result_chil = $ds->deserialize($result_chil_from_remote->{res});
305             }else{
306 0         0 $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
307             }
308 0         0 $result_with_id = {id => $#$data + $k, result => $result_chil->{result}, remote => $rand_remote, is_success => 1};
309             }else{
310 0         0 $result_with_id = {id => $#$data + $k, remote => $rand_remote, is_success => 0, failed_data => $failed_data->[$k]};
311             }
312 0         0 $pm2->finish(0,$result_with_id);
313             }
314 0         0 $pm2->wait_all_children;
315             }
316 1         3 my $result_merged;
317 1         9 push(@$result_merged,@$result);
318 1 50       9 if($#$result_failover >= 0){
319 0         0 push(@$result_merged,@$result_failover);
320             }
321 1 50       9 if($#$discarded_data >= 0){
322 0 0       0 if($self->die_discarded_data == 1){
    0          
323 0         0 die "Fatal: Discarded data exist due to remote server couldn't process requested data.\n";
324             }elsif($self->warn_discarded_data == 1){
325 0         0 warn "Warning: Discarded data exist.\n";
326             }
327             }
328 1 50       14 if($self->skip_undef_result == 1){
329 1         5 my $result_skip;
330 1         9 for(0 .. $#$result_merged){
331 4 50       15 if(defined($result_merged->[$_])){
332 4         10 push(@$result_skip,$result_merged->[$_]);
333             }
334             }
335 1         8 return($reducer_ref->($result_skip));
336             }else{
337 0           return($reducer_ref->($result_merged));
338             }
339             }
340              
341             sub worker {
342 0     0 1   my $self = shift;
343 0           my $path = shift;
344 0           my $worker = shift;
345 0           my $port = shift;
346              
347 0           $self->{path} = $path;
348              
349 0 0         unless(defined($worker)){
350 0           $worker = 4;
351             }
352 0           $self->{worker_num} = $worker;
353              
354 0 0         unless(defined($port)){
355 0           $port = 5000;
356             }
357 0           $self->{port} = $port;
358              
359 0           my $rc = eval{
360 0           require Plack::Handler::Starlet;
361 0           1;
362             };
363 0 0 0       if($rc and $self->force_plackup == 0){
364 0           print "Starting MapReduce Framework Worker by Starlet\n";
365 0           print "Path: $path\nPort: $port\n";
366 0           my $app = $self->load_worker_plack_app($path);
367 0           my $handler = Plack::Handler::Starlet->new(
368             max_workers => $worker,
369             port => $port
370             );
371 0           $handler->run($app);
372             }else{
373 0           $self->{worker_num} = 1;
374 0           require Plack::Runner;
375 0           my $runner = Plack::Runner->new;
376 0           print "Starting MapReduce Framework Worker by plackup. The number of workers will be ignored.\n";
377 0           print "Path: $path\nPort: $port\n";
378 0           my $app = $self->load_worker_plack_app($path);
379 0           $runner->parse_options('--port',$port);
380 0           $runner->run($app);
381             }
382             }
383              
384             sub load_worker_plack_app {
385 0     0 1   my $self = shift;
386 0           my $path = shift;
387             my $app = sub {
388 0     0     my $env = shift;
389 0           my $req = Plack::Request->new($env);
390 0 0         if($self->worker_log == 1){
391 0           print "REQ,$$,".$req->address.',';
392 0           my @tar = localtime(time());
393 0           printf(
394             "%04d-%02d-%02d %02d:%02d:%02d",
395             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
396             );
397 0           print "\n";
398             }
399             my $response = {
400             $path => sub {
401 0   0       my $msg_req = $req->content //
402             return [400,['Content-Type' => 'text/plain'],['Content body required.']];
403 0   0       my $perl_req = _msgpack_to_perl($msg_req) //
404             return [400,['Content-Type' => 'text/plain'],['Valid MessagePack required']];
405 0           my $data = $perl_req->{data};
406 0           my $code_text = $perl_req->{code};
407 0           my $code_ref;
408 0           eval('$code_ref = sub '.$code_text.';');
409 0           my $result = $code_ref->($data);
410 0           return [200,['Content-Type' => 'application/x-msgpack; charset=x-user-defined'],[_perl_to_msgpack({result => $result})]];
411             },
412             $path.'/server_spec' => sub {
413 0           my $server_spec = $self->{server_spec};
414 0           $server_spec->{worker_num} = $self->{worker_num};
415 0           $server_spec->{port} = $self->{port};
416 0           $server_spec->{path} = $self->{path};
417              
418 0           return [200,['Content-Type' => 'application/x-msgpack; charset=x-user-defined'],[_perl_to_msgpack({server_spec => $server_spec})]];
419             },
420             $path.'/STORABLE' => sub {
421 0           my $ds = Data::Serializer->new(
422             serializer => 'Storable',
423             compress => 1
424             );
425 0   0       my $msg_req = $req->content //
426             return [400,['Content-Type' => 'text/plain'],['Content body required.']];
427 0   0       my $perl_req = $ds->deserialize($msg_req) //
428             return [400,['Content-Type' => 'text/plain'],['Valid MessagePack required']];
429 0           my $data = $perl_req->{data};
430 0           my $code_text = $perl_req->{code};
431 0           my $code_ref;
432 0           eval('$code_ref = sub '.$code_text.';');
433 0           my $result_pre = $code_ref->($data);
434 0           my $result = $ds->serialize({result => $result_pre});
435 0           return [200,['Content-Type' => 'application/octet-stream; charset=x-user-defined'],[$result]];
436             },
437 0           };
438 0 0         if($self->worker_log == 1){
439 0           print "END,$$,".$req->address.',';
440 0           my @tar = localtime(time());
441 0           printf(
442             "%04d-%02d-%02d %02d:%02d:%02d",
443             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
444             );
445 0           print "\n";
446             }
447 0 0         if(defined($response->{$env->{PATH_INFO}})){
448 0           return $response->{$env->{PATH_INFO}}->();
449             }else{
450 0           return [404,['Content-Type' => 'text/plain'],['Not Found']];
451             }
452              
453 0           };
454 0           return($app);
455             }
456              
457             sub check_server_spec {
458 0     0 0   my $self = shift;
459 0           my $server_list = shift;
460 0           my $output;
461 0           for(my $k=0; $k <= $#$server_list; $k++){
462 0           my $result_chil_from_remote = _post_content(
463             $server_list->[$k] . '/server_spec',
464             'application/x-msgpack; charset=x-user-defined',
465             '',
466             $self->verify_hostname
467             );
468 0 0         if($result_chil_from_remote->{is_success}){
469 0           my $tmp_spec = _msgpack_to_perl($result_chil_from_remote->{res});
470 0           $output->{$server_list->[$k]} = $tmp_spec;
471             }
472             }
473 0           return $output;
474             }
475              
476             sub _post_content {
477 0     0     my $url = shift;
478 0           my $content_type = shift;
479 0           my $data = shift;
480 0           my $ssl_opt = shift;
481 0           my $ua = WWW::Mechanize->new(
482             ssl_opts => {
483             verify_hostname => $ssl_opt
484             }
485             );
486 0           my $is_success = 1;
487 0           eval{
488 0           $ua->post($url,'Content-Type' => $content_type, Content => $data);
489             };
490 0 0         if($@){
491 0           $is_success = 0;
492             }
493 0           my $res = $ua->content();
494 0           return {res => $res, is_success => $is_success};
495             }
496              
497             sub _perl_to_msgpack {
498 0     0     my $data = shift;
499 0           my $msgpack = Data::MessagePack->new();
500 0           my $packed = $msgpack->pack($data);
501 0           return($packed);
502             }
503              
504             sub _msgpack_to_perl {
505 0     0     my $msg_text = shift;
506 0           my $msgpack = Data::MessagePack->new();
507 0           my $unpacked = $msgpack->unpack($msg_text);
508 0           return($unpacked);
509             }
510              
511              
512              
513             __PACKAGE__->meta->make_immutable();
514              
515             1;
516             __END__