File Coverage

blib/lib/POE/Component/Client/eris.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             # ABSTRACT: POE Component for reading eris events
2              
3             package POE::Component::Client::eris;
4              
5 1     1   127784 use warnings;
  1         3  
  1         50  
6 1     1   8 use strict;
  1         3  
  1         35  
7 1     1   7 use Carp;
  1         3  
  1         108  
8 1     1   20417 use Parse::Syslog::Line;
  0            
  0            
9              
10             use POE qw(
11             Component::Client::TCP
12             Filter::Stream
13             );
14              
15             our $VERSION = '1.4';
16              
17              
18             sub spawn {
19             my $type = shift;
20              
21             #
22             # Param Setup
23             my %args = (
24             RemoteAddress => 'localhost',
25             RemotePort => 9514,
26             Alias => 'eris_client',
27             ReturnType => 'hash',
28             Subscribe => undef,
29             Match => undef,
30             MessageHandler => undef,
31             @_
32             );
33              
34             my $block = $args{ReturnType} eq 'block';
35             my $separator = $block ? "\n" : '';
36              
37             #
38             # Build the client connection
39             my $tcp_sessid = POE::Component::Client::TCP->new(
40             Alias => $args{Alias},
41             RemoteAddress => $args{RemoteAddress},
42             RemotePort => $args{RemotePort},
43             Filter => $block ? 'POE::Filter::Stream' : 'POE::Filter::Line',
44             Connected => sub {
45             my ($kernel,$heap) = @_[KERNEL,HEAP];
46             $heap->{readyState} = 0;
47             $heap->{connected} = 0;
48             $heap->{buffer} = '';
49             $kernel->delay( 'do_setup_pipe' => 1 );
50             },
51             ConnectError => sub {
52             my ($kernel,$syscall,$errid,$errstr) = @_[KERNEL,ARG0,ARG1,ARG2];
53             carp "Connection Error ($errid) at $syscall: $errstr\n";
54             $kernel->delay('reconnect' => 10);
55             },
56             Disconnected => sub {
57             my ($kernel,$heap) = @_[KERNEL,HEAP];
58             $kernel->delay('reconnect' => 10);
59             },
60             ServerError => sub {
61             my ($kernel,$syscall,$errid,$errstr) = @_[KERNEL,ARG0,ARG1,ARG2];
62             carp "Server Error ($errid) at $syscall: $errstr\n";
63             $kernel->delay('reconnect' => 5);
64             },
65             #
66             # Handle messages from the server.
67             # Set readyState = 1 if applicable
68             # Call the inline states:
69             # handle_message (successful)
70             # handle_unknown (out of order input)
71             ServerInput => sub {
72             my ($kernel,$heap,$instr) = @_[KERNEL,HEAP,ARG0];
73             chomp $instr unless $block;
74             if( $heap->{readyState} == 1 ) {
75             $kernel->yield('handle_message' => $instr);
76             }
77             elsif( $heap->{connected} == 1 ) {
78             if( $instr =~ /^Subscribed to \:/ ) {
79             $heap->{readyState} = 1;
80             }
81             elsif( $instr =~ /^Receiving / ) {
82             $heap->{readyState} = 1;
83             }
84             elsif( $instr =~ /^Full feed enabled/ ) {
85             $heap->{readyState} = 1;
86             }
87             else {
88             $kernel->yield( 'handle_unknown' => $instr );
89             }
90             }
91             elsif( $instr =~ /^EHLO Streamer/ ) {
92             $heap->{connected} = 1;
93             }
94             else {
95             $kernel->yield( 'handle_unknown' => $instr );
96             }
97             },
98             #
99             # Inline States
100             InlineStates => {
101             do_setup_pipe => sub {
102             my ($kernel,$heap) = @_[KERNEL,HEAP];
103              
104             # Parse for Subscriptions or Matches
105             my %data = ();
106             foreach my $target (qw(Subscribe Match)) {
107             if( exists $args{$target} && defined $args{$target} ) {
108             my @data = ref $args{$target} eq 'ARRAY' ? @{ $args{$target} } : $args{$target};
109             @data = map { lc } @data if $target eq 'Subscribe';
110             next unless scalar @data > 0;
111             $data{$target} = \@data;
112             }
113             }
114              
115             # Check to make sure we're doing something
116             croak "Must specify a subscription or a match parameter!\n" unless keys %data;
117              
118             # Send the Subscription
119             foreach my $target (sort { $a cmp $b } keys %data) {
120             my $subname = "do_" . lc $target;
121             $kernel->yield( $subname => $data{$target} );
122             }
123             },
124             do_subscribe => sub {
125             my ($kernel,$heap,$subs) = @_[KERNEL,HEAP,ARG0];
126              
127             if( grep /^fullfeed$/, @{ $subs } ) {
128             $heap->{server}->put('fullfeed' . $separator);
129             }
130             else {
131             $heap->{server}->put('sub ' . join(', ', @{ $subs }) . $separator );
132             }
133             },
134             do_match => sub {
135             my ($kernel,$heap,$matches) = @_[KERNEL,HEAP,ARG0];
136              
137             $heap->{server}->put('match ' . join(', ', @{ $matches }) . $separator );
138             },
139             handle_message => sub {
140             my ($kernel,$heap,$instr) = @_[KERNEL,HEAP,ARG0];
141              
142             my $msg = undef;
143             if( $args{ReturnType} eq 'string' ) {
144             $msg = $instr;
145             }
146             elsif( $args{ReturnType} eq 'block' ) {
147             my $index = rindex $instr, "\n";
148              
149             if( $index == -1 ) {
150             $heap->{buffer} .= $instr;
151             return;
152             }
153             else {
154             $msg = $heap->{buffer} . substr $instr, 0, $index + 1;
155             $heap->{buffer} = substr $instr, $index + 1;
156             }
157             }
158             else {
159             eval {
160             no warnings;
161             $msg = parse_syslog_line($instr);
162             };
163             if($@ || !defined $msg) {
164             return;
165             }
166             }
167              
168             if( ref $args{MessageHandler} ne 'CODE' ) {
169             croak "You need to specify a subroutine reference to the 'MessageHandler' parameter.\n";
170             }
171             # Try the Message Handler, eventually we can do statistics here.
172             eval {
173             $args{MessageHandler}->( $msg );
174             };
175             },
176             handle_unknown => sub {
177             my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];
178              
179             carp "UNKNOWN INPUT: $msg\n";
180             },
181             },
182             );
183              
184             #
185             # Return the TCP Session ID
186             return $tcp_sessid;
187             }
188              
189              
190             1; # End of POE::Component::Client::eris
191              
192             __END__