File Coverage

blib/lib/IO/Storm/Component.pm
Criterion Covered Total %
statement 92 118 77.9
branch 9 26 34.6
condition 0 6 0.0
subroutine 20 22 90.9
pod 6 8 75.0
total 127 180 70.5


line stmt bran cond sub pod time code
1             # ABSTRACT: The base class for Bolts and Spouts.
2              
3             package IO::Storm::Component;
4             $IO::Storm::Component::VERSION = '0.15';
5             # Imports
6 3     3   103820 use strict;
  3         5  
  3         108  
7 3     3   29 use warnings;
  3         5  
  3         77  
8 3     3   28 use v5.10;
  3         9  
  3         183  
9 3     3   14 use IO::Handle qw(autoflush);
  3         5  
  3         272  
10 3     3   1421 use IO::File;
  3         2272  
  3         366  
11 3     3   17 use Log::Log4perl qw(:easy);
  3         3  
  3         23  
12 3     3   1479 use JSON::XS;
  3         3  
  3         138  
13 3     3   13 use Data::Dumper;
  3         3  
  3         130  
14 3     3   19 use IO::Storm::Tuple;
  3         4  
  3         63  
15              
16             # Setup Moo for object-oriented niceties
17 3     3   11 use Moo;
  3         3  
  3         22  
18 3     3   875 use namespace::clean;
  3         3  
  3         19  
19              
20             # Setup STDIN/STDOUT/STDERR to use UTF8
21             binmode STDERR, ':utf8';
22 3     3   15 binmode STDIN, ':encoding(UTF-8)';
  3         4  
  3         22  
23             binmode STDOUT, ':utf8';
24              
25             has '_pending_commands' => (
26             is => 'rw',
27             default => sub { [] },
28             );
29              
30             has '_pending_taskids' => (
31             is => 'rw',
32             default => sub { [] },
33             );
34              
35             has '_stdin' => (
36             is => 'rw',
37             default => sub {
38             my $io = IO::Handle->new;
39             $io->fdopen( fileno(STDIN), 'r' );
40             }
41             );
42              
43             has 'max_lines' => (
44             is => 'rw',
45             default => 100
46             );
47              
48             has 'max_blank_msgs' => (
49             is => 'rw',
50             default => 500
51             );
52              
53             has '_json' => (
54             is => 'rw',
55             default => sub { JSON::XS->new->allow_blessed->convert_blessed }
56             );
57              
58             has '_topology_name' => (
59             is => 'rw',
60             init_args => undef
61             );
62              
63             has '_task_id' => (
64             is => 'rw',
65             init_args => undef
66             );
67              
68             has '_component_name' => (
69             is => 'rw',
70             init_args => undef
71             );
72              
73             has '_debug' => (
74             is => 'rw',
75             init_args => undef
76             );
77              
78             has '_storm_conf' => (
79             is => 'rw',
80             init_args => undef
81             );
82              
83             has '_context' => (
84             is => 'rw',
85             init_args => undef
86             );
87              
88             my $logger = Log::Log4perl->get_logger('storm');
89              
90              
91             sub _setup_component {
92 0     0   0 my ( $self, $storm_conf, $context ) = @_;
93 0         0 my $conf_is_hash = ref($storm_conf) eq ref {};
94 0 0 0     0 $self->_topology_name(
95             ( $conf_is_hash && exists( $storm_conf->{'topology.name'} ) )
96             ? $storm_conf->{'topology.name'}
97             : ''
98             );
99 0 0       0 $self->_task_id( exists( $context->{taskid} ) ? $context->{taskid} : '' );
100 0         0 $self->_component_name('');
101 0 0       0 if ( exists( $context->{'task->component'} ) ) {
102 0         0 my $task_component = $context->{'task->component'};
103 0 0       0 if ( exists( $task_component->{ $self->_task_id } ) ) {
104 0         0 $self->_component_name( $task_component->{ $self->_task_id } );
105             }
106             }
107             $self->_debug(
108 0 0 0     0 ( $conf_is_hash && exists( $storm_conf->{'topology.debug'} ) )
109             ? $storm_conf->{'topology.debug'}
110             : 0
111             );
112 0         0 $self->_storm_conf($storm_conf);
113 0         0 $self->_context($context);
114             }
115              
116              
117             sub read_message {
118 13     13 1 65 $logger->debug('start read_message');
119 13         81 my $self = shift;
120 13         14 my $blank_lines = 0;
121 13         16 my $message_size = 0;
122 13         16 my $line = '';
123              
124 13         18 my @messages = ();
125 13         12 while (1) {
126 26         135 $line = $self->_stdin->getline;
127 26 50       1160 if ( defined($line) ) {
128 26         79 $logger->debug("read_message: line=$line");
129             }
130             else {
131 0         0 $logger->error( "Received EOF while trying to read stdin from "
132             . "Storm, pipe appears to be broken, exiting." );
133 0         0 exit(1);
134             }
135 26 100       162 if ( $line eq "end\n" ) {
    50          
    50          
136 13         16 last;
137             }
138             elsif ( $line eq '' ) {
139 0         0 $logger->error( "Received EOF while trying to read stdin from "
140             . "Storm, pipe appears to be broken, exiting." );
141 0         0 exit(1);
142             }
143             elsif ( $line eq "\n" ) {
144 0         0 $blank_lines++;
145 0 0       0 if ( $blank_lines % 1000 == 0 ) {
146 0         0 $logger->warn( "While trying to read a command or pending "
147             . "task ID, Storm has instead sent $blank_lines "
148             . "'\\n' messages." );
149 0         0 next;
150             }
151             }
152 13         21 chomp($line);
153 13         15 push( @messages, $line );
154             }
155              
156 13         126 return $self->_json->decode( join( "\n", @messages ) );
157             }
158              
159             sub read_task_ids {
160 7     7 0 807 my $self = shift;
161              
162 7 50       10 if ( scalar( @{ $self->_pending_taskids } ) ) {
  7         38  
163 0         0 return shift( @{ $self->_pending_taskids } );
  0         0  
164             }
165             else {
166 7         23 my $msg = $self->read_message();
167 7         25 while ( ref($msg) ne 'ARRAY' ) {
168 3         5 push( @{ $self->_pending_commands }, $msg );
  3         12  
169 3         6 $msg = $self->read_message();
170             }
171              
172 7         25 return $msg;
173             }
174             }
175              
176             sub read_command {
177 3     3 0 30 my $self = shift;
178              
179 3 100       4 if ( @{ $self->_pending_commands } ) {
  3         13  
180 2         3 return shift( @{ $self->_pending_commands } );
  2         8  
181             }
182             else {
183 1         3 my $msg = $self->read_message();
184 1         8 while ( ref($msg) eq 'ARRAY' ) {
185 0         0 push( @{ $self->_pending_taskids }, $msg );
  0         0  
186 0         0 $msg = $self->read_message();
187             }
188 1         3 return $msg;
189             }
190             }
191              
192              
193             sub read_tuple {
194 2     2 1 1197 my $self = shift;
195 2         13 $logger->debug('read_tuple');
196              
197 2         16 my $tupmap = $self->read_command();
198              
199 2         19 return IO::Storm::Tuple->new(
200             id => $tupmap->{id},
201             component => $tupmap->{comp},
202             stream => $tupmap->{stream},
203             task => $tupmap->{task},
204             values => $tupmap->{tuple}
205             );
206             }
207              
208              
209             sub read_handshake {
210 1     1 1 3109 my $self = shift;
211              
212             # TODO: Figure out how to redirect stdout to ensure that print
213             # statements/functions won't crash the Storm Java worker
214              
215 1         10 autoflush STDOUT 1;
216 1         43 autoflush STDERR 1;
217              
218 1         19 my $msg = $self->read_message();
219             $logger->debug(
220 1     0   6 sub { 'Received initial handshake from Storm: ' . Dumper($msg) } );
  0         0  
221              
222             # Write a blank PID file out to the pidDir
223 1         8 my $pid = $$;
224 1         2 my $pid_dir = $msg->{pidDir};
225 1         2 my $filename = $pid_dir . '/' . $pid;
226 1 50       76 open my $fh, '>', $filename
227             or die "Cant't write to '$filename': $!\n";
228 1         9 $fh->close;
229 1         15 $logger->debug("Sending process ID $pid to Storm");
230 1         9 $self->send_message( { pid => int($pid) } );
231              
232 1         10 return [ $msg->{conf}, $msg->{context} ];
233             }
234              
235              
236             sub send_message {
237 11     11 1 3490 my ( $self, $msg ) = @_;
238 11         460 say $self->_json->encode($msg);
239 11         97 say "end";
240             }
241              
242              
243             sub sync {
244 1     1 1 1243 my $self = shift;
245 1         6 $self->send_message( { command => 'sync' } );
246             }
247              
248              
249             sub log {
250 1     1 1 1388 my ( $self, $message ) = @_;
251 1         7 $self->send_message( { command => 'log', msg => $message } );
252             }
253              
254             1;
255              
256             __END__