File Coverage

blib/lib/IO/Storm/Bolt.pm
Criterion Covered Total %
statement 47 77 61.0
branch 10 24 41.6
condition 2 7 28.5
subroutine 9 14 64.2
pod 6 6 100.0
total 74 128 57.8


line stmt bran cond sub pod time code
1             # ABSTRACT: The base class for all IO::Storm Bolts.
2              
3             package IO::Storm::Bolt;
4             $IO::Storm::Bolt::VERSION = '0.17';
5             # Imports
6 1     1   99773 use strict;
  1         2  
  1         31  
7 1     1   5 use warnings;
  1         1  
  1         28  
8 1     1   9 use v5.10;
  1         2  
  1         26  
9 1     1   3 use Try::Tiny;
  1         1  
  1         53  
10              
11             # Setup Moo for object-oriented niceties
12 1     1   4 use Moo;
  1         2  
  1         7  
13 1     1   299 use namespace::clean;
  1         1  
  1         9  
14              
15             extends 'IO::Storm::Component';
16              
17             # A boolean indicating whether or not the bolt should automatically
18             # anchor emits to the incoming tuple ID. Tuple anchoring is how Storm
19             # provides reliability, you can read more about tuple anchoring in Storm's
20             # docs:
21             # https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html#what-is-storms-reliability-api
22             has 'auto_anchor' => (
23             is => 'rw',
24             default => 1
25             );
26              
27             # A boolean indicating whether or not the bolt should automatically
28             # acknowledge tuples after ``process()`` is called.
29             has 'auto_ack' => (
30             is => 'rw',
31             default => 1
32             );
33              
34             # A boolean indicating whether or not the bolt should automatically fail
35             # tuples when an exception occurs when the ``process()`` method is called.
36             has 'auto_fail' => (
37             is => 'rw',
38             default => 1
39             );
40              
41             # Using a list so Bolt and subclasses can have more than one current_tup
42             has '_current_tups' => (
43             is => 'rw',
44             default => sub { [] },
45             init_arg => undef
46             );
47              
48             sub initialize {
49 0     0 1 0 my ( $self, $storm_conf, $context ) = @_;
50             }
51              
52             sub process {
53 0     0 1 0 my ( $self, $tuple ) = @_;
54             }
55              
56             sub emit ($$;$) {
57 3     3 1 2985 my ( $self, $tuple, $args ) = @_;
58              
59 3   50     10 $args = $args // {};
60 3         8 my $msg = { command => 'emit', tuple => $tuple };
61              
62 3         4 my $anchors = [];
63 3 50       11 if ( $self->auto_anchor ) {
64 3   50     10 $anchors = $self->_current_tups // [];
65             }
66 3 100       8 unless ( defined( $args->{anchors} ) ) {
67 2         3 $args->{anchors} = $anchors;
68             }
69              
70 3         4 for my $a ( @{ $args->{anchors} } ) {
  3         5  
71 2 50       5 if ( ref($a) eq "IO::Storm::Tuple" ) {
72 0         0 $a = $a->id;
73             }
74 2         5 push( @$anchors, $a );
75             }
76              
77 3 100       8 if ( defined( $args->{stream} ) ) {
78 1         3 $msg->{stream} = $args->{stream};
79             }
80              
81 3 50       7 if ( defined( $args->{direct_task} ) ) {
82 0         0 $msg->{task} = $args->{direct_task};
83             }
84              
85 3         4 $msg->{anchors} = $anchors;
86              
87 3         11 $self->send_message($msg);
88              
89 3 50       10 if ( defined $msg->{task} ) {
90 0         0 return $msg->{task};
91             }
92             else {
93 3         8 return $self->read_task_ids();
94             }
95             }
96              
97             sub ack {
98 1     1 1 2567 my ( $self, $tuple ) = @_;
99 1         2 my $tup_id;
100 1 50       5 if ( ref($tuple) eq "IO::Storm::Tuple" ) {
101 1         4 $tup_id = $tuple->id;
102             }
103             else {
104 0         0 $tup_id = $tuple;
105             }
106 1         14 $self->send_message( { command => 'ack', id => $tup_id } );
107             }
108              
109             sub fail {
110 1     1 1 1154 my ( $self, $tuple ) = @_;
111 1         2 my $tup_id;
112 1 50       5 if ( ref($tuple) eq "IO::Storm::Tuple" ) {
113 1         5 $tup_id = $tuple->id;
114             }
115             else {
116 0         0 $tup_id = $tuple;
117             }
118              
119 1         5 $self->send_message( { command => 'fail', id => $tup_id } );
120             }
121              
122             sub run {
123 0     0 1   my ($self) = @_;
124 0           my $tup;
125              
126 0           my ( $storm_conf, $context ) = $self->read_handshake();
127 0           $self->_setup_component( $storm_conf, $context );
128 0           $self->initialize( $storm_conf, $context );
129              
130             try {
131 0     0     while (1) {
132 0           $tup = $self->read_tuple();
133 0           $self->_current_tups( [$tup] );
134 0 0 0       if ( $tup->{task} == -1 && $tup->{stream} eq '__heartbeat' ) {
135 0           $self->send_message( { command => 'sync' } );
136             }
137             else {
138 0           $self->process($tup);
139 0 0         if ( $self->auto_ack ) {
140 0           $self->ack($tup);
141             }
142              
143             }
144              
145             # reset so that we don't accidentally fail the wrong tuples
146             # if a successive call to read_tuple fails
147 0           $self->_current_tups( [] );
148             }
149             }
150             catch {
151 0     0     my $error = $_;
152 0 0         if ( scalar( @{ $self->_current_tups } ) == 1 ) {
  0            
153 0           $tup = $self->_current_tups->[0];
154 0 0         if ( $self->auto_fail ) {
155 0           $self->fail($tup);
156             }
157             }
158 0           $self->log("Bolt encountered exception: $_");
159 0           die("Encounter exception in Bolt: $_");
160 0           };
161             }
162              
163             1;
164              
165             __END__