File Coverage

blib/lib/MapReduce/Framework/Simple.pm
Criterion Covered Total %
statement 69 283 24.3
branch 11 72 15.2
condition 2 13 15.3
subroutine 12 21 57.1
pod 4 5 80.0
total 98 394 24.8


line stmt bran cond sub pod time code
1             package MapReduce::Framework::Simple;
2 6     6   66997 use 5.010001;
  6         56  
3 6     6   30 use strict;
  6         17  
  6         106  
4 6     6   25 use warnings;
  6         11  
  6         139  
5 6     6   34 use B::Deparse;
  6         17  
  6         138  
6 6     6   2256 use Mouse;
  6         121056  
  6         26  
7 6     6   4246 use Data::MessagePack;
  6         5170  
  6         151  
8 6     6   2563 use Parallel::ForkManager;
  6         153756  
  6         189  
9 6     6   2479 use Plack::Request;
  6         329511  
  6         185  
10 6     6   3503 use WWW::Mechanize;
  6         540028  
  6         242  
11 6     6   84 use List::Util qw(shuffle);
  6         13  
  6         12852  
12              
13             our $VERSION = "0.08";
14              
15             has 'verify_hostname' => (is => 'rw', isa => 'Int', default => 1);
16             has 'skip_undef_result' => (is => 'rw', isa => 'Int', default => 1);
17             has 'warn_discarded_data' => (is => 'rw', isa => 'Int', default => 1);
18             has 'die_discarded_data' => (is => 'rw', isa => 'Int', default => 0);
19             has 'worker_log' => (is => 'rw', isa => 'Int', default => 0);
20             has 'force_plackup' => (is => 'rw', isa => 'Int', default => 0);
21             has 'server_spec' => (is => 'rw', isa => 'HashRef', default => sub{{cores => 1, clock => 2000, mem => 1}});
22             has 'worker_num' => (is => 'rw', isa => 'Int', default => 1);
23             has 'port' => (is => 'rw', isa => 'Int', default => 5000);
24             has 'path' => (is => 'rw', isa => 'Str', default => '/eval');
25              
26              
27             # To make load balanced data.
28             sub create_assigned_data {
29 0     0 1 0 my $self = shift;
30 0         0 my $data = shift;
31 0         0 my $servers = shift;
32 0         0 my $options = shift;
33 0         0 my $chunk_num = 10;
34 0         0 my $method = 'volume_uniform';
35 0 0       0 if(defined($options)){
36 0 0       0 if(defined($options->{chunk_num})){
37 0         0 $chunk_num = $options->{chunk_num};
38             }
39 0 0       0 if(defined($options->{method})){
40 0         0 $method = $options->{method};
41             }
42             }
43 0         0 my $output;
44 0 0       0 if($method eq 'element_shuffle'){
    0          
    0          
    0          
    0          
45              
46 0         0 @$data = shuffle(@$data);
47 0         0 for(0 .. $#$data){
48 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
49             }
50 0         0 for(0 .. $#$output){
51 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
52             }
53             }elsif($method eq 'element_sequential'){
54 0         0 for(0 .. $#$data){
55 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
56             }
57 0         0 for(0 .. $#$output){
58 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
59             }
60             }elsif($method eq 'element_server_cores'){
61 0         0 @$data = shuffle(@$data);
62 0         0 my $server_spec = $self->check_server_spec($servers);
63 0         0 my $tmp_server_list;
64 0         0 foreach my $key (keys %$server_spec){
65 0         0 push(@$tmp_server_list,$key);
66 0         0 for(2 .. $server_spec->{$key}->{server_spec}->{cores}){
67 0         0 push(@$tmp_server_list,$key);
68             }
69             }
70 0         0 for(0 .. $#$data){
71 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
72             }
73 0         0 for(0 .. $#$output){
74 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
75             }
76             }elsif($method eq 'element_server_workers'){
77 0         0 @$data = shuffle(@$data);
78 0         0 my $server_spec = $self->check_server_spec($servers);
79 0         0 my $tmp_server_list;
80 0         0 foreach my $key (keys %$server_spec){
81 0         0 push(@$tmp_server_list,$key);
82 0         0 for(2 .. $server_spec->{$key}->{server_spec}->{worker_num}){
83 0         0 push(@$tmp_server_list,$key);
84             }
85             }
86 0         0 for(0 .. $#$data){
87 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
88             }
89 0         0 for(0 .. $#$output){
90 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
91             }
92             }elsif($method eq 'element_server_core_clock'){
93 0         0 @$data = shuffle(@$data);
94 0         0 my $server_spec = $self->check_server_spec($servers);
95 0         0 my $tmp_server_list;
96              
97 0         0 my $tmp_max_cc = 0;
98 0         0 my $tmp_min_cc = 1_000_000_000_000;
99 0         0 foreach my $key (keys %$server_spec){
100             my $tmp_cc = ($server_spec->{$key}->{server_spec}->{cores} + 0.0001)
101 0         0 * ($server_spec->{$key}->{server_spec}->{clock} + 0.0001);
102 0 0       0 if($tmp_max_cc < $tmp_cc){
103 0         0 $tmp_max_cc = $tmp_cc;
104             }
105 0 0       0 if($tmp_min_cc > $tmp_cc){
106 0         0 $tmp_min_cc = $tmp_cc;
107             }
108             }
109              
110 0         0 my $pre_ratio = ((log($tmp_max_cc) / log(10))+(log($tmp_max_cc) / log(10)));
111 0         0 my $ratio = 1/ 10 ** int($pre_ratio / 2);
112 0         0 foreach my $key (keys %$server_spec){
113 0         0 push(@$tmp_server_list,$key);
114             my $tmp_cc = $server_spec->{$key}->{server_spec}->{cores}
115 0         0 * $server_spec->{$key}->{server_spec}->{clock};
116 0         0 my $add_num = int($tmp_cc * $ratio);
117 0         0 for(2 .. $add_num){
118 0         0 push(@$tmp_server_list,$key);
119             }
120             }
121 0         0 for(0 .. $#$data){
122 0         0 push(@{$output->[$_ % scalar(@$tmp_server_list)]->[0]},$data->[$_]);
  0         0  
123             }
124 0         0 for(0 .. $#$output){
125 0         0 $output->[$_]->[1] = $tmp_server_list->[$_ % scalar(@$tmp_server_list)];
126             }
127             }else{
128 0         0 my $mp = Data::MessagePack->new();
129             @$data = map $_->[0],
130 0         0 sort {$a->[1] <=> $b->[1]} map [$_, bytes::length $mp->pack($_)],
  0         0  
131             @$data;
132 0         0 for(0 .. $#$data){
133 0         0 push(@{$output->[$_ % $chunk_num]->[0]},$data->[$_]);
  0         0  
134             }
135 0         0 for(0 .. $#$output){
136 0         0 $output->[$_]->[1] = $servers->[$_ % scalar(@$servers)];
137             }
138             }
139 0         0 return($output);
140             }
141              
142             # MapReduce client(Master)
143             sub map_reduce {
144 5     5 1 19260 my $self = shift;
145 5         10 my $data = shift;
146 5         15 my $mapper_ref = shift;
147 5         10 my $reducer_ref = shift;
148 5         10 my $max_proc = shift;
149 5         10 my $options = shift;
150 5         10 my $remote_flg = 1;
151 5 50 33     45 if(defined($options) and defined($options->{remote})){
152 5         15 $remote_flg = $options->{remote};
153             }
154 5         14225 my $stringified_code = B::Deparse->new->coderef2text($mapper_ref);
155 5         60 my $result;
156             my $succeeded_remotes;
157 5         0 my $failed_remotes;
158 5         0 my $failed_data;
159 5         0 my $discarded_data;
160 5         130 my $pm = Parallel::ForkManager->new($max_proc);
161             $pm->run_on_finish(
162             sub {
163 4     4   1002354 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
164 4 50       45 if (defined $data_structure) {
165 4 50       16 if($data_structure->{is_success} == 1){
166 4         15 $succeeded_remotes->{$data_structure->{remote}} = 1;
167 4         18 $result->[$data_structure->{id}] = $data_structure->{result};
168             }else{
169 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
170 0         0 push(@$failed_data,$data_structure->{failed_data});
171 0         0 $result->[$data_structure->{id}] = undef;
172             }
173             }
174             }
175 5         2900 );
176 5 50       75 if($remote_flg == 1){
177 0         0 for(my $k=0; $k <= $#$data; $k++){
178 0 0       0 $pm->start and next;
179 0         0 my $payload = _perl_to_msgpack(
180             {
181             data => $data->[$k]->[0],
182             code => $stringified_code
183             }
184             );
185 0         0 my $result_chil_from_remote = _post_content(
186             $data->[$k]->[1],
187             'application/x-msgpack; charset=x-user-defined',
188             $payload,
189             $self->verify_hostname
190             );
191 0         0 my $result_with_id;
192 0 0       0 if($result_chil_from_remote->{is_success}){
193 0         0 my $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
194 0         0 $result_with_id = {id => $k, result => $result_chil->{result}, remote => $data->[$k]->[1], is_success => 1};
195             }else{
196 0         0 $result_with_id = {id => $k, remote => $data->[$k]->[0], is_success => 0, failed_data => $data->[$k]};
197             }
198 0         0 $pm->finish(0,$result_with_id);
199             }
200             }else{
201 5         25 for(my $k=0; $k <= $#$data; $k++){
202 14 100       7505 $pm->start and next;
203 4         5908 my $result_chil = $mapper_ref->($data->[$k]);
204 4         5915 my $result_with_id = {id => $k, result => $result_chil, is_success => 1, remote => 'LOCAL'};
205 4         459 $pm->finish(0,$result_with_id);
206             }
207             }
208 1         733 $pm->wait_all_children;
209 1         11 my $result_failover;
210 1 50 33     9 if($remote_flg == 1 and $#$failed_data >= 0){
211 0         0 my @succeeded_remotes_list;
212 0         0 foreach my $key (keys %$succeeded_remotes){
213 0         0 push(@succeeded_remotes_list,$key);
214             }
215 0         0 my $pm2 = Parallel::ForkManager->new($max_proc);
216             $pm2->run_on_finish(
217             sub {
218 0     0   0 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure) = @_;
219 0 0       0 if (defined $data_structure) {
220 0 0       0 if($data_structure->{is_success} == 1){
221 0         0 $succeeded_remotes->{$data_structure->{remote}} = 1;
222 0         0 $result_failover->[$data_structure->{id}] = $data_structure->{result};
223             }else{
224 0         0 $failed_remotes->{$data_structure->{remote}} = 1;
225 0         0 push(@$discarded_data,$data_structure->{failed_data});
226 0         0 $result_failover->[$data_structure->{id}] = undef;
227             }
228             }
229             }
230 0         0 );
231              
232 0         0 for(my $k=0; $k <= $#$failed_data; $k++){
233 0 0       0 $pm2->start and next;
234 0         0 my $payload = _perl_to_msgpack(
235             {
236             data => $failed_data->[$k]->[0],
237             code => $stringified_code
238             }
239             );
240 0         0 my $rand_remote = $succeeded_remotes_list[int(rand($#succeeded_remotes_list))];
241 0         0 my $result_chil_from_remote = _post_content(
242             $rand_remote,
243             'application/x-msgpack; charset=x-user-defined',
244             $payload,
245             $self->verify_hostname
246             );
247 0         0 my $result_with_id;
248 0 0       0 if($result_chil_from_remote->{is_success}){
249 0         0 my $result_chil = _msgpack_to_perl($result_chil_from_remote->{res});
250 0         0 $result_with_id = {id => $#$data + $k, result => $result_chil->{result}, remote => $rand_remote, is_success => 1};
251             }else{
252 0         0 $result_with_id = {id => $#$data + $k, remote => $rand_remote, is_success => 0, failed_data => $failed_data->[$k]};
253             }
254 0         0 $pm2->finish(0,$result_with_id);
255             }
256 0         0 $pm2->wait_all_children;
257             }
258 1         4 my $result_merged;
259 1         4 push(@$result_merged,@$result);
260 1 50       5 if($#$result_failover >= 0){
261 0         0 push(@$result_merged,@$result_failover);
262             }
263 1 50       6 if($#$discarded_data >= 0){
264 0 0       0 if($self->die_discarded_data == 1){
    0          
265 0         0 die "Fatal: Discarded data exist due to remote server couldn't process requested data.\n";
266             }elsif($self->warn_discarded_data == 1){
267 0         0 warn "Warning: Discarded data exist.\n";
268             }
269             }
270 1 50       11 if($self->skip_undef_result == 1){
271 1         2 my $result_skip;
272 1         7 for(0 .. $#$result_merged){
273 4 50       10 if(defined($result_merged->[$_])){
274 4         11 push(@$result_skip,$result_merged->[$_]);
275             }
276             }
277 1         9 return($reducer_ref->($result_skip));
278             }else{
279 0           return($reducer_ref->($result_merged));
280             }
281             }
282              
283             sub worker {
284 0     0 1   my $self = shift;
285 0           my $path = shift;
286 0           my $worker = shift;
287 0           my $port = shift;
288              
289 0           $self->{path} = $path;
290              
291 0 0         unless(defined($worker)){
292 0           $worker = 4;
293             }
294 0           $self->{worker_num} = $worker;
295              
296 0 0         unless(defined($port)){
297 0           $port = 5000;
298             }
299 0           $self->{port} = $port;
300              
301 0           my $rc = eval{
302 0           require Plack::Handler::Starlet;
303 0           1;
304             };
305 0 0 0       if($rc and $self->force_plackup == 0){
306 0           print "Starting MapReduce Framework Worker by Starlet\n";
307 0           print "Path: $path\nPort: $port\n";
308 0           my $app = $self->load_worker_plack_app($path);
309 0           my $handler = Plack::Handler::Starlet->new(
310             max_workers => $worker,
311             port => $port
312             );
313 0           $handler->run($app);
314             }else{
315 0           $self->{worker_num} = 1;
316 0           require Plack::Runner;
317 0           my $runner = Plack::Runner->new;
318 0           print "Starting MapReduce Framework Worker by plackup. The number of workers will be ignored.\n";
319 0           print "Path: $path\nPort: $port\n";
320 0           my $app = $self->load_worker_plack_app($path);
321 0           $runner->parse_options('--port',$port);
322 0           $runner->run($app);
323             }
324             }
325              
326             sub load_worker_plack_app {
327 0     0 1   my $self = shift;
328 0           my $path = shift;
329             my $app = sub {
330 0     0     my $env = shift;
331 0           my $req = Plack::Request->new($env);
332 0 0         if($self->worker_log == 1){
333 0           print "REQ,$$,".$req->address.',';
334 0           my @tar = localtime(time());
335 0           printf(
336             "%04d-%02d-%02d %02d:%02d:%02d",
337             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
338             );
339 0           print "\n";
340             }
341             my $response = {
342             $path => sub {
343 0   0       my $msg_req = $req->content //
344             return [400,['Content-Type' => 'text/plain'],['Content body required.']];
345 0   0       my $perl_req = _msgpack_to_perl($msg_req) //
346             return [400,['Content-Type' => 'text/plain'],['Valid MessagePack required']];
347 0           my $data = $perl_req->{data};
348 0           my $code_text = $perl_req->{code};
349 0           my $code_ref;
350 0           eval('$code_ref = sub '.$code_text.';');
351 0           my $result = $code_ref->($data);
352 0           return [200,['Content-Type' => 'application/x-msgpack; charset=x-user-defined'],[_perl_to_msgpack({result => $result})]];
353             },
354             $path.'/server_spec' => sub {
355 0           my $server_spec = $self->{server_spec};
356 0           $server_spec->{worker_num} = $self->{worker_num};
357 0           $server_spec->{port} = $self->{port};
358 0           $server_spec->{path} = $self->{path};
359              
360 0           return [200,['Content-Type' => 'application/x-msgpack; charset=x-user-defined'],[_perl_to_msgpack({server_spec => $server_spec})]];
361             }
362 0           };
363 0 0         if($self->worker_log == 1){
364 0           print "END,$$,".$req->address.',';
365 0           my @tar = localtime(time());
366 0           printf(
367             "%04d-%02d-%02d %02d:%02d:%02d",
368             $tar[5]+1900,$tar[4]+1,$tar[3],$tar[2],$tar[1],$tar[0]
369             );
370 0           print "\n";
371             }
372 0 0         if(defined($response->{$env->{PATH_INFO}})){
373 0           return $response->{$env->{PATH_INFO}}->();
374             }else{
375 0           return [404,['Content-Type' => 'text/plain'],['Not Found']];
376             }
377              
378 0           };
379 0           return($app);
380             }
381              
382             sub check_server_spec {
383 0     0 0   my $self = shift;
384 0           my $server_list = shift;
385 0           my $output;
386 0           for(my $k=0; $k <= $#$server_list; $k++){
387 0           my $result_chil_from_remote = _post_content(
388             $server_list->[$k] . '/server_spec',
389             'application/x-msgpack; charset=x-user-defined',
390             '',
391             $self->verify_hostname
392             );
393 0 0         if($result_chil_from_remote->{is_success}){
394 0           my $tmp_spec = _msgpack_to_perl($result_chil_from_remote->{res});
395 0           $output->{$server_list->[$k]} = $tmp_spec;
396             }
397             }
398 0           return $output;
399             }
400              
401             sub _post_content {
402 0     0     my $url = shift;
403 0           my $content_type = shift;
404 0           my $data = shift;
405 0           my $ssl_opt = shift;
406 0           my $ua = WWW::Mechanize->new(
407             ssl_opts => {
408             verify_hostname => $ssl_opt
409             }
410             );
411 0           my $is_success = 1;
412 0           eval{
413 0           $ua->post($url,'Content-Type' => $content_type, Content => $data);
414             };
415 0 0         if($@){
416 0           $is_success = 0;
417             }
418 0           my $res = $ua->content();
419 0           return {res => $res, is_success => $is_success};
420             }
421              
422             sub _perl_to_msgpack {
423 0     0     my $data = shift;
424 0           my $msgpack = Data::MessagePack->new();
425 0           my $packed = $msgpack->pack($data);
426 0           return($packed);
427             }
428              
429             sub _msgpack_to_perl {
430 0     0     my $msg_text = shift;
431 0           my $msgpack = Data::MessagePack->new();
432 0           my $unpacked = $msgpack->unpack($msg_text);
433 0           return($unpacked);
434             }
435              
436              
437              
438             __PACKAGE__->meta->make_immutable();
439              
440             1;
441             __END__