File Coverage

blib/lib/POE/Wheel/UDP.pm
Criterion Covered Total %
statement 106 128 82.8
branch 33 74 44.5
condition 6 18 33.3
subroutine 13 13 100.0
pod 2 2 100.0
total 160 235 68.0


line stmt bran cond sub pod time code
1             package POE::Wheel::UDP;
2              
3             =head1 NAME
4              
5             POE::Wheel::UDP - POE Wheel for UDP handling.
6              
7             =head1 SYNOPSIS
8              
9             use POE;
10             use POE::Wheel::UDP;
11            
12             POE::Session->create(
13             inline_states => {
14             _start => sub {
15             my $wheel = $_[HEAP]->{wheel} = POE::Wheel::UDP->new(
16             LocalAddr => '10.0.0.1',
17             LocalPort => 1234,
18             PeerAddr => '10.0.0.2',
19             PeerPort => 1235,
20             InputEvent => 'input',
21             Filter => POE::Filter::Stream->new,
22             );
23             $wheel->put(
24             {
25             payload => [ 'This datagram will go to the default address.' ],
26             },
27             {
28             payload => [ 'This datagram will go to the explicit address and port I have paired with it.' ],
29             addr => '10.0.0.3',
30             port => 1236,
31             },
32             );
33             },
34             input => sub {
35             my ($wheel_id, $input) = @_[ARG0, ARG1];
36             print "Incoming datagram from $input->{addr}:$input->{port}: '$input->{payload}'\n";
37             },
38             }
39             );
40              
41             POE::Kernel->run;
42              
43             =head1 DESCRIPTION
44              
45             POE Wheel for UDP handling.
46              
47             =cut
48              
49 2     2   209874 use 5.006; # I don't plan to support old perl
  2         9  
  2         95  
50 2     2   15 use strict;
  2         6  
  2         72  
51 2     2   11 use warnings;
  2         9  
  2         95  
52              
53 2     2   12 use base 'POE::Wheel';
  2         4  
  2         2454  
54              
55 2     2   2376 use POE;
  2         63450  
  2         13  
56 2     2   96603 use Carp;
  2         5  
  2         95  
57 2     2   12 use Socket;
  2         3  
  2         1270  
58 2     2   12 use Fcntl;
  2         3  
  2         3686  
59              
60             our $VERSION = '0.02';
61             $VERSION = eval $VERSION; # see L
62              
63             =head1 Package Methods
64              
65             =head2 $wheel = POE::Wheel::UDP->new( OPTIONS );
66              
67             Constructor for a new UDP Wheel object. OPTIONS is a key => value pair list specifying the following options:
68              
69             =over
70              
71             =item LocalAddr
72              
73             =item LocalPort
74              
75             (Required Pair)
76              
77             Specify the local IP address and port for the created socket. LocalAddr should be in dotted-quad notation,
78             and LocalPort should be an integer. This module will not resolve names to numbers at all.
79              
80             =item PeerAddr
81              
82             =item PeerPort
83              
84             (Optional Pair)
85              
86             Specify the remote IP address and port for the created socket. As above, PeerAddr should be in dotted-quad
87             notation, and PeerPort should be an integer. These arguments are used to perform a C connect(2) on the socket,
88             which means that outbound datagrams will be sent to this address by default AND inbound datagrams from sources
89             other than this peer will be ignored. If you want to just set a default destination for packets, use the
90             DefaultAddr and DefaultPort items instead.
91              
92             =item DefaultAddr
93              
94             =item DefaultPort
95              
96             (Optional Pair)
97              
98             Dotted quad, and integer (respectively) options for the default destination of datagrams originating from this
99             wheel. This setting will override the PeerAddr and PeerPort on each put() method, but you can override this
100             by passing arguments directly to the put() method.
101              
102             =item InputEvent
103              
104             (Optional)
105              
106             Specify the event to be invoked via Kernel->yield when a packet arrives on this socket. Currently all incoming
107             data is truncated to 1500 bytes. If you do not specify an event, the wheel will not ask the kernel to pass
108             incoming datagrams to it, and therefore this wheel will not hold your session alive.
109              
110             =item InputFilter
111              
112             (Required if InputEvent defined)
113              
114             Assign a POE::Filter object to the input side of this wheel.
115              
116             =item OutputFilter
117              
118             (Required if you want to call the put method)
119              
120             Assign a POE::Filter object to the output side of this wheel.
121              
122             =item Filter
123              
124             Shorthand for assigning the same filter object to both the InputFilter and OutputFilter arguments.
125              
126             =back
127              
128             =cut
129              
130             sub new {
131 2     2 1 278 my $class = shift;
132 2 50       9 carp( "Uneven set of options passed to ${class}->new." ) unless (@_ % 2 == 0);
133 2         12 my %opts = @_;
134            
135 2   33     12 my $self = bless { }, (ref $class || $class);
136              
137 2         4 my %sockopts;
138              
139 2         6 foreach (qw(LocalAddr LocalPort PeerAddr PeerPort)) {
140 8 50       24 $sockopts{$_} = delete( $opts{$_} ) if exists( $opts{$_} );
141             }
142              
143 2         9 $self->_open( %sockopts );
144              
145 2         13 my $id = $self->{id} = $self->SUPER::allocate_wheel_id();
146 2         19 my $read_event = $self->{read_event} = ref($self) . "($id) -> select read";
147 2         6 my $write_event = $self->{write_event} = ref($self) . "($id) -> select write";
148              
149 2 50 33     11 if (exists( $opts{DefaultAddr} ) or exists( $opts{DefaultPort} )) {
150 0 0       0 croak "DefaultAddr is required if DefaultPort is specified."
151             unless exists( $opts{DefaultAddr} );
152 0 0       0 croak "DefaultPort is required if DefaultAddr is specified."
153             unless exists( $opts{DefaultPort} );
154              
155 0 0       0 my $addr = inet_aton( $opts{DefaultAddr} )
156             or croak( "Supplied 'DefaultAddr' value '$opts{DefaultAddr}' caused inet_aton failure: $!" );
157              
158 0 0       0 my $spec = pack_sockaddr_in( $opts{DefaultPort}, $addr )
159             or croak( "Supplied 'DefaultPort' value '$opts{DefaultPort}' caused pack_sockaddr_in failure: $!" );
160              
161 0         0 $self->{DefaultAddr} = delete $opts{DefaultAddr};
162 0         0 $self->{DefaultPort} = delete $opts{DefaultPort};
163 0         0 $self->{default_send} = $spec;
164             }
165              
166 2 50       11 if (exists( $opts{Filter} )) {
167 2         3 my $filter = delete $opts{Filter};
168 2   33     59 $opts{InputFilter} ||= $filter;
169 2   33     31 $opts{OutputFilter} ||= $filter;
170             }
171              
172 2 50       6 if (exists( $opts{InputFilter} )) {
173 2         3 $self->{InputFilter} = delete $opts{InputFilter};
174             }
175              
176 2 50       6 if (exists( $opts{OutputFilter} )) {
177 2         4 $self->{OutputFilter} = delete $opts{OutputFilter};
178             }
179              
180 2 100       4 if (exists( $opts{InputEvent} )) {
181 1 50       5 croak "InputFilter option is required if InputEvent is defined."
182             unless exists($self->{InputFilter});
183              
184 1         3 my $filter = \$self->{InputFilter};
185            
186 1         2 my $input_event = $self->{InputEvent} = delete $opts{InputEvent};
187              
188             $poe_kernel->state( $read_event, sub {
189 100     100   71058 my ($kernel, $socket) = @_[KERNEL, ARG0];
190 100         167 $! = undef;
191 100         852 while( my $addr = recv( $socket, my $input = "", 1500, MSG_DONTWAIT ) ) {
192 100 50       183 if (defined( $addr )) {
193 100         111 my %input_data;
194              
195 100 50       205 if ($addr) {
196 100 50       463 my ($port, $addr) = unpack_sockaddr_in( $addr )
197             or warn( "sockaddr_in failure: $!" );
198 100         577 $input_data{addr} = inet_ntoa( $addr );
199 100         195 $input_data{port} = $port;
200             }
201              
202 100         167 $input_data{bytes} = length( $input );
203            
204 100         127 local $POE::Filter::DATAGRAM = 1;
205              
206 100         386 $$filter->get_one_start( [ $input ] );
207              
208 100         459 my @payload;
209 100         255 while (my $records = $$filter->get_one) {
210 200 100       2266 last unless @$records;
211 100         405 push @payload, @$records;
212             }
213            
214 100         664 $poe_kernel->yield( $input_event, {
215             payload => \@payload,
216             %input_data,
217             }, $id );
218             }
219             else {
220 0         0 warn "recv failure: $!";
221             next
222 0         0 }
223             }
224 1         13 } );
225            
226 1         25 $poe_kernel->select_read( $self->{sock}, $read_event );
227             }
228              
229             # Does anyone know if I should watch for writability on the socket at all? it's pretty hard to test
230             # to see if UDP can ever return EAGAIN because I can't get it to go fast enough to blast past the buffers.
231              
232 2 50       96 croak "Extra options passed to new(): " . join( ', ', map { "'$_'" } keys %opts )
  0         0  
233             if keys %opts;
234              
235 2         8 return $self;
236             }
237              
238             sub _open {
239 2     2   3 my $self = shift;
240 2         14 my %opts = @_;
241            
242 2         974 my $proto = getprotobyname( "udp" );
243            
244 2 50       71 socket( my $sock, PF_INET, SOCK_DGRAM, $proto )
245             or die( "socket() failure: $!" );
246              
247 2 50       13 fcntl( $sock, F_SETFL, O_NONBLOCK | O_RDWR )
248             or die( "fcntl problem: $!" );
249            
250 2 50       15 setsockopt( $sock, SOL_SOCKET, SO_REUSEADDR, 1 )
251             or die( "setsockopt SO_REUSEADDR failed: $!" );
252              
253             {
254 2 50       3 my $addr = inet_aton( $opts{LocalAddr} )
  2         31  
255             or die( "inet_aton problem: $!" );
256 2 50       11 my $sockaddr = sockaddr_in( $opts{LocalPort}, $addr )
257             or die( "sockaddr_in problem: $!" );
258 2 50       39 bind( $sock, $sockaddr )
259             or die( "bind error: $!" );
260             }
261              
262 2 50 33     14 if ($opts{PeerAddr} and $opts{PeerPort}) {
263 2 50       11 my $addr = inet_aton( $opts{PeerAddr} )
264             or die( "inet_aton problem: $!" );
265 2 50       6 my $sockaddr = sockaddr_in( $opts{PeerPort}, $addr )
266             or die( "sockaddr_in problem: $!" );
267 2 50       31 connect( $sock, $sockaddr )
268             or die( "connect error: $!" );
269             }
270              
271 2         11 return $self->{sock} = $sock;
272             }
273              
274             =head1 Object Methods
275              
276             =head2 $wheel->put( LIST )
277              
278             Returns the total number of bytes sent in this call, which may not match the number of bytes
279             you passed in for payloads due to send(2) semantics. Takes a list of hashrefs with the
280             following useful keys in them:
281              
282             =over
283              
284             =item payload
285              
286             An arrayref of records you wish to put through the filter and send in datagrams. The arrayref
287             is used to allow more than one logical record per datagram.
288              
289             =item bytes
290              
291             How many bytes were read from this datagram. Currently a maximum of 1500 will be read, and
292             datagrams which are larger will be truncated.
293              
294             =item addr
295              
296             =item port
297              
298             Specify a destination IP address and port for this specific packet. Optional if you specified
299             a PeerAddr and PeerPort in the wheel constructor; Required if you did not.
300              
301             =back
302              
303             =cut
304              
305             sub put {
306 100     100 1 16351 my $self= shift;
307              
308 100         153 my $sock = $self->{sock};
309 100         103 my $total_bytes = 0;
310              
311 100         246 while (my $thing = shift) {
312 100 50       184 if (!defined( $thing )) {
313 0         0 warn "Undefined argument, ignoring";
314 0         0 next;
315             }
316              
317 100 50       233 if (ref( $thing ) ne 'HASH') {
318 0         0 warn "Non-hasref argument, ignoring";
319 0         0 next;
320             }
321              
322 100 50       205 my $payload = $thing->{payload} or die;
323              
324 100 50       203 die unless ref($payload) eq 'ARRAY';
325            
326 100         136 my $filter = $self->{OutputFilter};
327 100         243 my $records = $filter->put( $payload );
328            
329 100         407 my $bytes;
330 100 50 33     516 if (exists( $thing->{addr} ) or exists( $thing->{port} )) {
    50          
331 0 0       0 my $addr = $thing->{addr} or die;
332 0 0       0 my $port = $thing->{port} or die;
333            
334 0         0 foreach my $output (@$records) {
335 0         0 $bytes = send( $sock, $output, MSG_DONTWAIT, sockaddr_in( $port,inet_aton( $addr ) ) );
336             }
337             }
338             elsif (exists( $self->{default_send} )) {
339 0         0 my $default_send = $self->{default_send};
340 0         0 foreach my $output (@$records) {
341 0         0 $bytes = send( $sock, $output, MSG_DONTWAIT, $default_send );
342             }
343             }
344             else {
345 100         171 foreach my $output (@$records) {
346 100         2102 $bytes = send( $sock, $output, MSG_DONTWAIT );
347             }
348             }
349              
350 100 50       243 if (!defined( $bytes )) {
351 0         0 die( "send() failed: $!" );
352             # if we ever remove fatal handling of this, do the following:
353             # push current thing onto buffer.
354             # last;
355             }
356 100         350 $total_bytes += $bytes;
357             }
358              
359             # push rest of @_ onto buffer
360              
361 100         188 return $total_bytes;
362             }
363              
364             sub DESTROY {
365 2     2   1353 my $self = shift;
366 2 50       11 if ($self->{read_event}) {
367 2         12 $poe_kernel->state( delete $self->{read_event} );
368 2         60 $poe_kernel->select_read( $self->{sock} );
369             }
370 2         172 $self->SUPER::free_wheel_id( delete $self->{id} );
371             }
372              
373             sub allocate_wheel_id; # try to cancel this method from being inhereted.
374             sub free_wheel_id;
375              
376             1;
377             __END__