File Coverage

blib/lib/IO/Storm/Bolt.pm
Criterion Covered Total %
statement 48 76 63.1
branch 10 22 45.4
condition 2 4 50.0
subroutine 9 14 64.2
pod 6 6 100.0
total 75 122 61.4


line stmt bran cond sub pod time code
1             # ABSTRACT: The base class for all IO::Storm Bolts.
2              
3             package IO::Storm::Bolt;
4             $IO::Storm::Bolt::VERSION = '0.15';
5             # Imports
6 1     1   128000 use strict;
  1         2  
  1         30  
7 1     1   3 use warnings;
  1         1  
  1         23  
8 1     1   8 use v5.10;
  1         2  
  1         26  
9 1     1   3 use Try::Tiny;
  1         1  
  1         50  
10              
11             # Setup Moo for object-oriented niceties
12 1     1   5 use Moo;
  1         1  
  1         7  
13 1     1   300 use namespace::clean;
  1         1  
  1         8  
14              
15              
16             extends 'IO::Storm::Component';
17              
18             # A boolean indicating whether or not the bolt should automatically
19             # anchor emits to the incoming tuple ID. Tuple anchoring is how Storm
20             # provides reliability, you can read more about tuple anchoring in Storm's
21             # docs:
22             # https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html#what-is-storms-reliability-api
23             has 'auto_anchor' => (
24             is => 'rw',
25             default => 1
26             );
27              
28             # A boolean indicating whether or not the bolt should automatically
29             # acknowledge tuples after ``process()`` is called.
30             has 'auto_ack' => (
31             is => 'rw',
32             default => 1
33             );
34              
35             # A boolean indicating whether or not the bolt should automatically fail
36             # tuples when an exception occurs when the ``process()`` method is called.
37             has 'auto_fail' => (
38             is => 'rw',
39             default => 1
40             );
41              
42             # Using a list so Bolt and subclasses can have more than one current_tup
43             has '_current_tups' => (
44             is => 'rw',
45             default => sub { [] },
46             init_arg => undef
47             );
48              
49              
50             sub initialize {
51 0     0 1 0 my ( $self, $storm_conf, $context ) = @_;
52             }
53              
54              
55             sub process {
56 0     0 1 0 my ( $self, $tuple ) = @_;
57             }
58              
59              
60             sub emit ($$;$) {
61 3     3 1 3303 my ( $self, $tuple, $args ) = @_;
62              
63 3   50     8 $args = $args // {};
64 3         7 my $msg = { command => 'emit', tuple => $tuple };
65              
66 3         5 my $anchors = [];
67 3 50       12 if ( $self->auto_anchor ) {
68 3   50     8 $anchors = $self->_current_tups // [];
69             }
70 3 100       8 unless ( defined( $args->{anchors} ) ) {
71 2         3 $args->{anchors} = $anchors;
72             }
73              
74 3         3 my $a;
75 3         3 for $a ( @{ $args->{anchors} } ) {
  3         7  
76 2 50       5 if ( ref($a) eq "IO::Storm::Tuple" ) {
77 0         0 $a = $a->id;
78             }
79 2         5 push( @$anchors, $a );
80             }
81              
82 3 100       8 if ( defined( $args->{stream} ) ) {
83 1         1 $msg->{stream} = $args->{stream};
84             }
85              
86 3 50       8 if ( defined( $args->{direct_task} ) ) {
87 0         0 $msg->{task} = $args->{direct_task};
88             }
89              
90 3         4 $msg->{anchors} = $anchors;
91              
92 3         7 $self->send_message($msg);
93              
94 3 50       8 if ( defined $msg->{task} ) {
95 0         0 return $msg->{task};
96             }
97             else {
98 3         10 return $self->read_task_ids();
99             }
100             }
101              
102              
103             sub ack {
104 1     1 1 2669 my ( $self, $tuple ) = @_;
105 1         2 my $tup_id;
106 1 50       4 if ( ref($tuple) eq "IO::Storm::Tuple" ) {
107 1         4 $tup_id = $tuple->id;
108             }
109             else {
110 0         0 $tup_id = $tuple;
111             }
112 1         10 $self->send_message( { command => 'ack', id => $tup_id } );
113             }
114              
115              
116             sub fail {
117 1     1 1 1184 my ( $self, $tuple ) = @_;
118 1         2 my $tup_id;
119 1 50       5 if ( ref($tuple) eq "IO::Storm::Tuple" ) {
120 1         4 $tup_id = $tuple->id;
121             }
122             else {
123 0         0 $tup_id = $tuple;
124             }
125              
126 1         5 $self->send_message( { command => 'fail', id => $tup_id } );
127             }
128              
129              
130             sub run {
131 0     0 1   my ($self) = @_;
132 0           my $tup;
133              
134 0           my ( $storm_conf, $context ) = $self->read_handshake();
135 0           $self->_setup_component( $storm_conf, $context );
136 0           $self->initialize( $storm_conf, $context );
137              
138             try {
139 0     0     while (1) {
140 0           $tup = $self->read_tuple();
141 0           $self->_current_tups( [$tup] );
142 0           $self->process($tup);
143 0 0         if ( $self->auto_ack ) {
144 0           $self->ack($tup);
145             }
146              
147             # reset so that we don't accidentally fail the wrong tuples
148             # if a successive call to read_tuple fails
149 0           $self->_current_tups( [] );
150             }
151             }
152             catch {
153 0     0     my $error = $_;
154 0 0         if ( scalar( @{ $self->_current_tups } ) == 1 ) {
  0            
155 0           $tup = $self->_current_tups->[0];
156 0 0         if ( $self->auto_fail ) {
157 0           $self->fail($tup);
158             }
159             }
160 0           $self->log("Bolt encountered exception: $_");
161 0           die("Encounter exception in Bolt: $_");
162 0           };
163             }
164              
165             1;
166              
167             __END__