File Coverage

blib/lib/POE/Component/Server/Syslog/UDP.pm
Criterion Covered Total %
statement 73 99 73.7
branch 12 28 42.8
condition 3 6 50.0
subroutine 13 16 81.2
pod 4 6 66.6
total 105 155 67.7


line stmt bran cond sub pod time code
1             # $Id: UDP.pm 449 2004-12-27 01:37:33Z sungo $
2             package POE::Component::Server::Syslog::UDP;
3              
4 5     5   684556 use warnings;
  5         12  
  5         197  
5 5     5   33 use strict;
  5         10  
  5         520  
6              
7             our $VERSION = '1.20';
8              
9             sub BINDADDR () { '0.0.0.0' }
10             sub BINDPORT () { 514 }
11             sub DATAGRAM_MAXLEN () { 1024 } # syslogd defaults to this. as do most
12             # libc implementations of syslog
13              
14 5     5   3396 use Params::Validate qw(validate_with);
  5         35334  
  5         387  
15 5     5   33 use Carp qw(carp croak);
  5         11  
  5         260  
16              
17 5     5   25 use POE;
  5         13  
  5         37  
18 5     5   3654 use POE::Filter::Syslog;
  5         25  
  5         129  
19              
20 5     5   46 use Socket;
  5         9  
  5         4577  
21 5     5   34 use IO::Socket::INET;
  5         7  
  5         84  
22              
23             sub spawn {
24 3     3 1 505 my $class = shift;
25              
26             my %args = validate_with(
27             params => \@_,
28             spec => {
29             InputState => {
30             type => &Params::Validate::CODEREF,
31             optional => 1,
32 0     0   0 default => sub {},
33             },
34             ErrorState => {
35             type => &Params::Validate::CODEREF,
36             optional => 1,
37 0     0   0 default => sub {},
38             },
39 3         301 BindAddress => {
40             type => &Params::Validate::SCALAR,
41             optional => 1,
42             default => BINDADDR,
43             },
44             BindPort => {
45             type => &Params::Validate::SCALAR,
46             optional => 1,
47             default => BINDPORT,
48             },
49             MaxLen => {
50             type => &Params::Validate::SCALAR,
51             optional => 1,
52             default => DATAGRAM_MAXLEN,
53             },
54             Alias => {
55             type => &Params::Validate::SCALAR,
56             optional => 1,
57             },
58             },
59             );
60              
61 3         46 $args{type} = 'udp';
62 3         33 $args{filter} = POE::Filter::Syslog->new();
63              
64 3         82 my $sess = POE::Session->create(
65             inline_states => {
66             _start => \&socket_start,
67             _stop => \&shutdown,
68              
69             select_read => \&select_read,
70             register => \®ister,
71             unregister => \&unregister,
72             shutdown => \&shutdown,
73              
74             client_input => $args{InputState},
75             client_error => $args{ErrorState},
76              
77             },
78             heap => \%args,
79             );
80              
81 3         524 return $sess;
82             }
83              
84              
85             # This is a really good spot to discuss why this is using IO::Socket
86             # instead of a POE wheel of some variety for this. The answer, for once
87             # in my life, is pretty simple. POE::Wheel::SocketFactory doesn't support
88             # connectionless sockets as of the time of writing. In this scenario,
89             # there is no chance of IO::Socket blocking, unless IO::Socket decides
90             # to lose its mind. If it does THAT, there's not a whole hell of a lot
91             # left that's right in the world. :) except maybe pizza. well, good
92             # pizza like you find at Generous George's in Alexandria, VA. and rum.
93             # pretty much any rum. Um, but anyway...
94              
95             sub socket_start {
96 3     3 0 778 $_[HEAP]->{handle} = IO::Socket::INET->new(
97             Blocking => 0,
98             LocalAddr => $_[HEAP]->{BindAddress},
99             LocalPort => $_[HEAP]->{BindPort},
100             Proto => 'udp',
101             ReuseAddr => 1,
102             SocketType => SOCK_DGRAM,
103             );
104              
105 3 50       1238 if (defined $_[HEAP]->{handle}) {
106 3         24 $_[KERNEL]->select_read( $_[HEAP]->{handle}, 'select_read' );
107             } else {
108 0         0 croak "Unable to create UDP Listener: $!";
109             }
110 3 100       330 $_[KERNEL]->alias_set( $_[HEAP]->{Alias} ) if $_[HEAP]->{Alias};
111 3         58 return;
112             }
113              
114             sub select_read {
115 3     3 0 1505323 my $message;
116 3         39 my $remote_socket = $_[HEAP]->{handle}->recv($message, $_[HEAP]->{MaxLen}, 0 );
117 3 50       119 if (defined $message) {
118 3         32 $_[HEAP]->{filter}->get_one_start([ $message ]);
119 3         8 my $records = [];
120 3   66     23 while( ($records = $_[HEAP]->{filter}->get_one()) and (@$records > 0)) {
121 3 50 33     26 if(defined $records and ref $records eq 'ARRAY') {
122 3         10 foreach my $record (@$records) {
123 3 50       22 if (my $addr = (sockaddr_in($remote_socket))[1]) {
124 3         67 $record->{addr} = inet_ntoa($addr);
125 3 50       4723 if (my $host = gethostbyaddr($addr, AF_INET)) {
126 3         21 $record->{host} = $host;
127             }
128             }
129              
130 3         41 $_[KERNEL]->yield( 'client_input', $record );
131 3         42 $_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{inputevent}, $record )
132 3         434 for keys %{ $_[HEAP]->{sessions} };
133             }
134             } else {
135 0         0 $_[KERNEL]->yield( 'client_error', $message );
136 0         0 $_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{errorevent}, $message )
137 0         0 for grep { defined $_[HEAP]->{sessions}->{errorevent} }
  0         0  
138             keys %{ $_[HEAP]->{sessions} };
139             }
140             }
141             }
142 3         21 return;
143             }
144              
145             sub shutdown {
146 4     4 1 9721 my ($kernel,$heap) = @_[KERNEL,HEAP];
147 4 100       25 if($heap->{handle}) {
148 3         22 $kernel->select_read($heap->{handle});
149 3         771 $heap->{handle}->close();
150             }
151 4         166 delete $heap->{handle};
152 4         21 $kernel->alarm_remove_all();
153 4         209 $kernel->alias_remove( $_ ) for $kernel->alias_list();
154 4         26 $kernel->refcount_decrement( $_, __PACKAGE__ )
155 4         192 for keys %{ $heap->{sessions} };
156 4         88 return;
157             }
158              
159             sub register {
160 1     1 1 463 my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
161 1         5 my $sender_id = $sender->ID();
162 1         4 my %args;
163 1 50       6 if ( ref $_[ARG0] eq 'HASH' ) {
    0          
164 1         1 %args = %{ $_[ARG0] };
  1         30  
165             }
166             elsif ( ref $_[ARG0] eq 'ARRAY' ) {
167 0         0 %args = @{ $_[ARG0] };
  0         0  
168             }
169             else {
170 0         0 %args = @_[ARG0..$#_];
171             }
172 1         8 $args{lc $_} = delete $args{$_} for keys %args;
173 1 50       5 unless ( $args{inputevent} ) {
174 0         0 warn "No 'inputevent' argument supplied\n";
175 0         0 return;
176             }
177 1 50       4 if ( defined $self->{sessions}->{ $sender_id } ) {
178 0         0 $self->{sessions}->{ $sender_id } = \%args;
179             }
180             else {
181 1         3 $self->{sessions}->{ $sender_id } = \%args;
182 1         8 $kernel->refcount_increment( $sender_id, __PACKAGE__ );
183             }
184 1         32 return;
185             }
186              
187             sub unregister {
188 0     0 1   my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
189 0           my $sender_id = $sender->ID();
190 0           my %args;
191 0 0         if ( ref $_[ARG0] eq 'HASH' ) {
    0          
192 0           %args = %{ $_[ARG0] };
  0            
193             }
194             elsif ( ref $_[ARG0] eq 'ARRAY' ) {
195 0           %args = @{ $_[ARG0] };
  0            
196             }
197             else {
198 0           %args = @_[ARG0..$#_];
199             }
200 0           $args{lc $_} = delete $args{$_} for keys %args;
201 0           my $data = delete $self->{sessions}->{ $sender_id };
202 0 0         $kernel->refcount_decrement( $sender_id, __PACKAGE__ ) if $data;
203 0           return;
204             }
205              
206             1;
207             __END__