File Coverage

blib/lib/POE/Component/AIO.pm
Criterion Covered Total %
statement 88 106 83.0
branch 12 28 42.8
condition 3 9 33.3
subroutine 22 23 95.6
pod 4 5 80.0
total 129 171 75.4


line stmt bran cond sub pod time code
1             package POE::Component::AIO;
2              
3 1     1   283642 use IO::AIO qw( poll_fileno poll_cb );
  1         2  
  1         95  
4 1     1   5 use POE;
  1         2  
  1         7  
5              
6 1     1   354 use strict;
  1         6  
  1         32  
7 1     1   6 use warnings;
  1         1  
  1         33  
8 1     1   5 use Carp qw( croak );
  1         1  
  1         67  
9              
10             our %callback_ids;
11             our $VERSION = '1.00';
12              
13 1     1   4 use vars qw( $poco_aio );
  1         2  
  1         89  
14              
15             sub import {
16 2     2   853 my ( $class, $args ) = @_;
17 2         3 my $package = caller();
18              
19 2 50 33     8 croak "PoCo::AIO expects its arguments in a hash ref"
20             if ( $args && ref( $args ) ne 'HASH' );
21              
22 2 50       5 unless ( delete $args->{no_auto_export} ) {
23             {
24 1     1   4 no strict 'refs';
  1         1  
  1         850  
  2         2  
25 2         3 *{ $package . '::poco_aio' } = \$poco_aio;
  2         15  
26             }
27              
28 1     1   5 eval( "package $package; use IO::AIO qw( 2 );" );
  1     1   1  
  1         221  
  1         4  
  1         2  
  1         309  
  2         105  
29 2 50       13 if ( $@ ) {
30 0         0 croak "could not export IO::AIO into $package (is it installed?)";
31             }
32             }
33              
34 2 50       5 return if ( $args->{no_auto_bootstrap} );
35              
36             # bootstrap
37 2         8 POE::Component::AIO->new( %$args );
38            
39 2         15 return;
40             }
41              
42             sub new {
43 2     2 1 2 my $class = shift;
44 2 100       6 return $poco_aio if ( $poco_aio );
45              
46 1   33     8 my $self = $poco_aio = bless({
47             session_id => undef,
48             postback => undef,
49             @_
50             }, ref $class || $class );
51              
52 1         8 POE::Session->create(
53             object_states => [
54             $self => [qw(
55             _start
56             _stop
57             poll_cb
58             do_postback
59             _shutdown
60             )]
61             ],
62             );
63              
64 1         133 return $self;
65             }
66              
67             sub _start {
68 1     1   212 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
69            
70 1         3 $self->{session_id} = $_[ SESSION ]->ID();
71            
72 1         8 $kernel->alias_set( "$self" );
73            
74 1 50       52 open( my $fh, "<&=".poll_fileno() ) or die "$!";
75 1         5 $kernel->select_read( $fh, 'poll_cb' );
76 1         87 $self->{_fh} = $fh;
77            
78 1 50       12 return unless( $self->{postback} );
79            
80             # XXX undocumented
81 0 0       0 if ( ref( $self->{postback} ) eq 'ARRAY' ) {
    0          
82 0         0 $kernel->post( @{$self->{postback}}, $self->{session_id}, "$self" );
  0         0  
83             } elsif ( ref( $self->{postback} ) eq 'CODE' ) {
84 0         0 $kernel->yield( 'do_postback' );
85             }
86            
87 0         0 return;
88             }
89              
90             sub do_postback {
91 0     0 0 0 my $self = $_[ OBJECT ];
92            
93 0         0 $self->{postback}->( $self->{session_id}, "$self" );
94            
95 0         0 return;
96             }
97              
98 1     1   254 sub _stop { }
99              
100             sub callback {
101 1     1 1 1108 my ($self, $event, @etc) = @_;
102              
103 1         3 my $id;
104 1 50       5 if ( ref( $event ) eq 'ARRAY' ) {
105 0         0 ( $id, $event ) = @$event;
106             } else {
107 1         8 my $ses = $poe_kernel->get_active_session();
108 1 50       6 if ( $ses ) {
109 1         5 $id = $ses->ID();
110             } else {
111 0         0 warn 'no active session in call to PoCo::AIO::callback';
112 0         0 return undef;
113             }
114             }
115              
116             my $callback = POE::Component::AIO::AnonCallback->new(sub {
117 1     1   14368 $poe_kernel->call( $id => $event => @etc => @_ );
118 1         20 });
119              
120 1         11 $callback_ids{$callback} = $id;
121              
122 1         7 $poe_kernel->refcount_increment( $id, 'anon_event' );
123              
124 1         221 return $callback;
125             }
126              
127             sub postback {
128 1     1 1 893 my ($self, $event, @etc) = @_;
129              
130 1         2 my $id;
131 1 50       8 if ( ref( $event ) eq 'ARRAY' ) {
132 1         3 ( $id, $event ) = @$event;
133             } else {
134 0         0 my $ses = $poe_kernel->get_active_session();
135 0 0       0 if ( $ses ) {
136 0         0 $id = $ses->ID();
137             } else {
138 0         0 warn 'no active session in call to PoCo::AIO::callback';
139 0         0 return undef;
140             }
141             }
142            
143             my $postback = POE::Component::AIO::AnonCallback->new(sub {
144 1     1   226 $poe_kernel->post( $id => $event => @etc => @_ );
145 1         17 });
146              
147 1         5 $callback_ids{$postback} = $id;
148              
149 1         7 $poe_kernel->refcount_increment( $id, 'anon_event' );
150              
151 1         74 return $postback;
152             }
153              
154             sub shutdown {
155 1     1 1 1128 $poe_kernel->call( shift->{session_id} => '_shutdown' );
156             }
157              
158             sub _shutdown {
159 1     1   62 my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
160              
161 1         25 $kernel->alias_remove( "$self" );
162 1         48 $kernel->select_read( delete $self->{_fh} );
163              
164 1         146 $poco_aio = undef;
165              
166 1         6 return;
167             }
168              
169             1;
170              
171             =pod
172              
173             =head1 NAME
174              
175             POE::Component::AIO - Asynchronous Input/Output for POE
176              
177             =head1 SYNOPSIS
178              
179             use POE qw( Component::AIO );
180              
181             ...
182              
183             aio_read( $fh, 0, 1024, $buffer, 0, $poco_aio->callback( 'open_done' ) );
184            
185             aio_read( $fh, 0, 1024, $buffer, 0, sub {
186             ...
187             } );
188              
189             =head1 DESCRIPTION
190              
191             This component adds support for L use in POE
192              
193             =head2 EXAMPLE
194              
195             use POE;
196              
197             Foo->new();
198              
199             $poe_kernel->run();
200              
201             package Foo;
202              
203             use POE qw( Component::AIO );
204             use Fcntl;
205              
206             use strict;
207             use warnings;
208              
209             sub new {
210             my $class = shift;
211              
212             my $self = bless( {}, $class );
213              
214             POE::Session->create(
215             object_states => [
216             $self => [qw(
217             _start
218             _stop
219              
220             open_done
221             read_done
222             )]
223             ]
224             );
225            
226             return $self;
227             }
228              
229             sub _start {
230             my $file = '/etc/passwd';
231            
232             aio_open( $file, O_RDONLY, 0, $poco_aio->callback( 'open_done', $file ) );
233             }
234            
235             sub open_done {
236             my ( $self, $session, $file, $fh ) = @_[ OBJECT, SESSION, ARG0, ARG1 ];
237            
238             unless ( defined $fh ) {
239             die "aio open failed on $file: $!";
240             }
241            
242             my $buffer = '';
243             # read 1024 bytes from $fh
244             aio_read( $fh, 0, 1024, $buffer, 0, $poco_aio->postback( 'read_done', \$buffer ) );
245             }
246            
247             sub read_done {
248             my ( $self, $buffer, $bytes ) = @_[ OBJECT, ARG0, ARG1 ];
249            
250             unless( $bytes > 0 ) {
251             die "aio read failed: $!";
252             }
253            
254             print $$buffer;
255             }
256            
257             sub _stop {
258             $poco_aio->shutdown();
259             }
260              
261             =head1 NOTES
262              
263             This module automaticly bootstraps itself on use(). $poco_aio is imported into your
264             namespace for easy use. Just like $poe_kernel when using L. There are two
265             import options available: no_auto_bootstrap and no_auto_export.
266              
267             Example:
268              
269             use POE::Component::AIO { no_auto_bootstrap => 1, no_auto_export => 1 };
270              
271             Also, use of this modules' callback and postback methods are completely optional.
272             They are included for convenience, but note that they don't work the same as the
273             postback and callback methods from L.
274              
275             =head1 METHODS
276              
277             =over 4
278              
279             =item new()
280              
281             Call this to get the singleton object, which is the same as $poco_aio. See the notes
282             above. You do not need to call this unless you have disabled auto bootstrapping.
283              
284             =item shutdown()
285              
286             Stop the session used by this module.
287              
288             =item callback( $event [, $params ] )
289              
290             Returns a callback. Params are optional and are stacked before params passed to the callback
291             at call time. This differs from L's callback because the params are not wrapped
292             in array references. It uses the current session to latch the callback to. If you want to
293             use another session, you can pass an array ref of the session id and event name as the event
294             param.
295              
296             Examples:
297              
298             $cb = $poco_aio->callback( 'foo' );
299             $cb = $poco_aio->callback( 'foo', $bar );
300             $cb = $poco_aio->callback( [ $session->ID(), 'foo' ] );
301             $cb = $poco_aio->callback( [ $session->ID(), 'foo' ], $bar );
302              
303             =item postback( $event [, $params ] );
304              
305             See the callback method. The only difference is that it uses a post instead of call
306              
307             =head1 SEE ALSO
308              
309             L, L
310              
311             =head1 AUTHOR
312              
313             David Davis
314             L
315              
316             =head1 LICENSE
317              
318             Artistic License
319              
320             =head1 COPYRIGHT AND LICENSE
321              
322             Copyright 2007 David Davis, and The Dojo Foundation.
323             Code was shared from the Cometd project L
324              
325             =cut
326              
327             package POE::Component::AIO::AnonCallback;
328              
329 1     1   5 use POE;
  1         1  
  1         4  
330              
331             sub new {
332 2     2   5 my $class = shift;
333              
334 2   33     20 bless( shift, ref $class || $class );
335             }
336              
337             sub DESTROY {
338 2     2   189 my $self = shift;
339 2         9 my $session_id = delete $POE::Component::AIO::callback_ids{"$self"};
340              
341 2 50       6 if ( defined $session_id ) {
342 2         9 $poe_kernel->refcount_decrement( $session_id, 'anon_event' );
343             } else {
344 0         0 warn "connection callback DESTROY without session_id to refcount_decrement";
345             }
346              
347 2         77 return;
348             }
349              
350              
351             1;
352