File Coverage

blib/lib/POE/Component/AtomAggregator.pm
Criterion Covered Total %
statement 19 179 10.6
branch 0 86 0.0
condition 0 5 0.0
subroutine 7 30 23.3
pod n/a
total 26 300 8.6


line stmt bran cond sub pod time code
1             package POE::Component::AtomAggregator;
2              
3 1     1   19227 use warnings;
  1         2  
  1         33  
4 1     1   5 use strict;
  1         1  
  1         41  
5              
6 1         7 use POE qw(
7             Component::Client::HTTP
8             Wheel::ReadWrite
9             Driver::SysRW
10 1     1   833 );
  1         65248  
11 1     1   300613 use Symbol qw( gensym );
  1         3  
  1         64  
12 1     1   899 use HTTP::Request;
  1         805  
  1         26  
13 1     1   7 use Carp qw(croak);
  1         1  
  1         2646  
14              
15             =head1 NAME
16              
17             POE::Component::AtomAggregator - Watch Muliple Atom Feeds for New Headlines
18              
19             =head1 VERSION
20              
21             Version 1.0
22              
23             =cut
24              
25             our $VERSION = '1.0';
26              
27             =head1 SYNOPSIS
28              
29             #!/usr/bin/perl
30             use strict;
31             use warnings;
32             use POE qw( Component::AtomAggregator );
33              
34             my @feeds = (
35             { url => "http://xantus.vox.com/library/posts/atom.xml",
36             name => "xantus",
37             delay => 600,
38             },
39             { url => "http://www.vox.com/explore/posts/atom.xml",
40             name => "vox",
41             delay => 60,
42             },
43             );
44              
45             POE::Session->create(
46             inline_states => {
47             _start => \&init_session,
48             handle_feed => \&handle_feed,
49             },
50             );
51              
52             $poe_kernel->run();
53              
54             sub init_session {
55             my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
56             $heap->{atomagg} = POE::Component::AtomAggregator->new(
57             alias => 'atomagg',
58             debug => 1,
59             callback => $session->postback('handle_feed'),
60             tmpdir => '/tmp', # optional caching
61             );
62             $kernel->post( 'atomagg', 'add_feed', $_ ) for @feeds;
63             }
64              
65             sub handle_feed {
66             my ( $kernel, $feed ) = ( $_[KERNEL], $_[ARG1]->[0] );
67             for my $entry ( $feed->late_breaking_news ) {
68            
69             # this is where this module differs from RSSAggregator!
70            
71             # do stuff with the XML::Atom::Entry object
72             print $entry->title . "\n";
73             }
74             }
75              
76             =head1 CONSTRUCTORS
77              
78             =head2 POE::Component::AtomAggregator->new( %hash );
79              
80             Create a new instace of PoCo::AtomAggregator.
81              
82             =over 4
83              
84             =item * alias
85              
86             POE alias to use for your instance of PoCo::AtomAggregator.
87              
88             =item * debug
89              
90             Boolean value to turn on verbose output.
91              
92             =item * tmpdir
93              
94             The tmpdir argument is used as the directory to cache Atom
95             between fetches (and instances).
96              
97             =item * http_alias
98              
99             Optional. Alias of an existing PoCo::Client::HTTP.
100              
101             =item * follow_redirects
102              
103             Optional. Only if you don't have an exiting PoCo::Client::HTTP.
104             Argument is passed to PoCoCl::HTTP to tell it the follow redirect
105             level. (Defaults to 2)
106              
107             =back
108              
109             =cut
110              
111             sub new {
112 0     0     my $class = shift;
113 0 0         croak __PACKAGE__ . "->new() params must be a hash" if @_ % 2;
114 0           my %params = @_;
115              
116 0 0         croak __PACKAGE__
117             . "->new() feeds param has been deprecated, use add_feed"
118             if $params{feeds};
119              
120 0           my $self = bless \%params, $class;
121 0           $self->_init();
122              
123 0           return $self;
124             }
125              
126             sub _start {
127 0     0     my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
128 0 0         $self->{alias} = 'atomagg' unless $self->{alias};
129 0           $kernel->alias_set( $self->{alias} );
130             }
131              
132 0     0     sub _stop {}
133              
134             sub _init {
135 0     0     my ($self) = @_;
136              
137 0 0         unless ($self->{http_alias}) {
138 0           $self->{http_alias} = 'ua';
139 0   0       $self->{follow_redirects} ||= 2;
140 0           POE::Component::Client::HTTP->spawn(
141             Alias => $self->{http_alias},
142             Timeout => 60,
143             FollowRedirects => $self->{follow_redirects},
144             Agent => 'Mozilla/5.0 (PoCo Atom Aggregator)',
145             );
146             }
147              
148 0           my $session = POE::Session->create(
149             object_states => [
150             $self => [qw(
151             _start
152             add_feed
153             remove_feed
154             pause_feed
155             resume_feed
156             _fetch
157             _response
158             shutdown
159             _stop
160              
161             _read_file
162             _file_read_input
163             _file_read_flush
164             _file_read_error
165              
166             _write_file
167             _file_write_flush
168             _file_write_error
169             )],
170             ],
171             );
172              
173 0           $self->{sid} = $session->ID();
174              
175 0           undef;
176             }
177              
178             sub _create_feed_object {
179 0     0     my ( $self, $feed_hash ) = @_;
180              
181 0 0         warn "[$feed_hash->{name}] Creating Feed object\n"
182             if $self->{debug};
183              
184 0 0 0       if ( exists $self->{tmpdir} && -d $self->{tmpdir} ) {
185 0           $feed_hash->{tmpdir} = $self->{tmpdir};
186             # effing windows?
187 0 0         $feed_hash->{tmpdir} .= "/"
188             unless ( $feed_hash->{tmpdir} =~ m!/$! );
189             }
190              
191 0 0         $feed_hash->{debug} = $self->{debug}
192             if $self->{debug};
193            
194 0 0         $feed_hash->{ignore_first} = $self->{ignore_first}
195             if $self->{ignore_first};
196              
197 0           $feed_hash->{_parent_sid} = $self->{sid};
198              
199 0 0         if ( my $atomfeed = POE::Component::AtomAggregator::Feed->new( $feed_hash ) ) {
200 0           $self->{feed_objs}{ $atomfeed->name } = $atomfeed;
201             } else {
202 0           warn "[$feed_hash->{name}] !! Error attempting to "
203             . "create Feed object\n";
204             }
205 0           return $feed_hash;
206             }
207              
208             =head1 METHODS
209              
210             =head2 $atomagg->feed_list
211              
212             Returns the current feeds as an array or array_ref.
213              
214             =cut
215              
216             sub feed_list {
217 0     0     my ($self) = @_;
218 0           my @feeds = map { $self->{feed_objs}{$_} } keys %{ $self->{feed_objs} };
  0            
  0            
219 0 0         return wantarray ? @feeds : \@feeds;
220             }
221              
222             =head2 $atomagg->feeds
223              
224             Returns a hash ref of feeds with the key being the feeds name.
225              
226             =cut
227              
228             sub feeds {
229 0     0     my ($self) = @_;
230 0           return $self->{feed_objs};
231             }
232              
233             =head2 $atomagg->feed( $feed_name )
234              
235             Accessor to access a the XML::Atom::Feed object via a feed's name.
236              
237             =cut
238              
239             sub feed {
240 0     0     my ( $self, $name ) = @_;
241 0 0         return exists $self->{feed_objs}{$name}
242             ? $self->{feed_objs}{$name}
243             : undef;
244             }
245              
246             =head2 $atomagg->add_feed( $hash_ref )
247              
248             The hash reference you pass in to add_feed is passed to
249             XML::Atom::Feed->new($hash_ref). ( see L )
250              
251             =cut
252              
253             sub add_feed {
254 0     0     my ( $self, $kernel, $feed_hash ) = @_[ OBJECT, KERNEL, ARG0 ];
255 0 0         if ( exists $self->{feed_objs}{ $feed_hash->{name} } ) {
256 0           warn "[$feed_hash->{name}] !! Add Failed: Feed name already exists\n";
257 0           return;
258             }
259 0 0         warn "[$feed_hash->{name}] Added\n" if $self->{debug};
260 0           $self->_create_feed_object($feed_hash);
261            
262 0 0         if ( $self->{tmpdir} ) {
263 0           my $file = $feed_hash->{tmpdir}.$feed_hash->{name}.".atom";
264 0 0         if ( -e $file ) {
265             # wheel read write
266             $poe_kernel->yield( _read_file => $feed_hash => sub {
267 0     0     my $f = shift;
268 0           delete $feed_hash->{pending_open};
269 0 0         if ( $f->{in} ) {
270 0           $feed_hash->parse( $f->{in}, 1 );
271             }
272 0           $kernel->yield( '_fetch', $feed_hash->{name} );
273 0           } );
274 0           return;
275             }
276             }
277             # Test to remove it after 10 seconds
278 0           $kernel->yield( '_fetch', $feed_hash->{name} );
279             }
280              
281             =head2 $atomagg->remove_feed( $feed_name )
282              
283             Pass in the name of the feed you want to remove.
284              
285             =cut
286              
287             sub remove_feed {
288 0     0     my ( $self, $kernel, $name ) = @_[ OBJECT, KERNEL, ARG0 ];
289 0 0         unless ( exists $self->{feed_objs}{$name} ) {
290 0           warn "[$name] remove_feed: Remove Failed: Unknown feed\n";
291 0           return;
292             }
293 0           $kernel->call( $self->{alias}, 'pause_feed', $name );
294 0           delete $self->{feed_objs}{$name};
295 0 0         warn "[$name] remove_feed: Removed Atom Feed\n" if $self->{debug};
296             }
297              
298             =head2 $atomagg->pause_feed( $feed_name )
299              
300             Pass in the name of the feed you want to pause.
301              
302             =cut
303              
304             sub pause_feed {
305 0     0     my ( $self, $kernel, $name ) = @_[ OBJECT, KERNEL, ARG0 ];
306 0 0         unless ( exists $self->{feed_objs}{$name} ) {
307 0           warn "[$name] pause_feed: Pause Failed: Unknown feed\n";
308 0           return;
309             }
310 0 0         unless ( exists $self->{alarm_ids}{$name} ) {
311 0           warn "[$name] pause_feed: Pause Failed: Feed currently on pause\n";
312 0           return;
313             }
314 0 0         if ( $kernel->alarm_remove( $self->{alarm_ids}{$name} ) ) {
315 0           delete $self->{alarm_ids}{$name};
316 0 0         warn "[$name] pause_feed: Paused Atom Feed\n" if $self->{debug};
317             }
318             else {
319 0 0         warn "[$name] pause_feed: Failed to Pause Atom Feed\n"
320             if $self->{debug};
321             }
322             }
323              
324             =head2 $atomagg->resume_feed( $feed_name )
325              
326             Pass in the name of the feed you want to resume (that you previously paused).
327              
328             =cut
329              
330             sub resume_feed {
331 0     0     my ( $self, $kernel, $name ) = @_[ OBJECT, KERNEL, ARG0 ];
332 0 0         unless ( exists $self->{feed_objs}{$name} ) {
333 0           warn "[$name] resume_feed: Resume Failed: Unknown feed\n";
334 0           return;
335             }
336 0 0         if ( exists $self->{alarm_ids}{$name} ) {
337 0           warn "[$name] resume_feed: Resume Failed: Feed currently active\n";
338 0           return;
339             }
340 0 0         warn "[$name] resume_feed: Resumed Atom Feed\n" if $self->{debug};
341 0           $kernel->yield( '_fetch', $name );
342             }
343              
344             =head2 $atomagg->shutdown
345              
346             Shutdown the instance of PoCo::AtomAggregator.
347              
348             =cut
349              
350             sub shutdown {
351 0     0     my ( $self, $kernel, $session ) = @_[ OBJECT, KERNEL, SESSION ];
352 0           for my $feed ( $self->feed_list ) {
353 0           $kernel->call( $session => 'remove_feed' => $feed->name );
354             }
355 0           delete $self->{callback};
356 0           $kernel->alias_remove( $self->{alias} );
357 0 0         warn "shutdown: shutting down atomaggregator\n" if $self->{debug};
358             }
359              
360             sub _fetch {
361 0     0     my ( $self, $kernel, $feed_name ) = @_[ OBJECT, KERNEL, ARG0 ];
362 0 0         unless ( exists $self->{feed_objs}{$feed_name} ) {
363 0           warn "[$feed_name] Unknown Feed\n";
364 0           return;
365             }
366              
367 0           my $atomfeed = $self->{feed_objs}{$feed_name};
368 0           my $req = HTTP::Request->new( GET => $atomfeed->url );
369 0 0         warn "[" . $atomfeed->name . "] Attempting to fetch\n" if $self->{debug};
370 0           $kernel->post( $self->{http_alias}, 'request', '_response', $req,
371             $atomfeed->name );
372 0           $self->{alarm_ids}{ $atomfeed->name }
373             = $kernel->delay_set( '_fetch', $atomfeed->delay, $atomfeed->name );
374             }
375              
376             sub _response {
377 0     0     my ( $self, $kernel, $request_packet, $response_packet )
378             = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
379              
380 0           my ( $req, $feed_name ) = @$request_packet;
381              
382 0 0         unless ( exists $self->{feed_objs}{$feed_name} ) {
383 0           warn "[$feed_name] Unknown Feed\n";
384 0           return;
385             }
386              
387 0           my $atomfeed = $self->{feed_objs}{$feed_name};
388 0           my $res = $response_packet->[0];
389 0 0         if ( $res->is_success ) {
390 0 0         warn "[" . $atomfeed->name . "] Fetched " . $atomfeed->url . "\n"
391             if $self->{debug};
392            
393 0 0         $self->{callback}->($atomfeed) if $atomfeed->parse( $res->content );
394             } else {
395 0           warn "[!!] Failed to fetch " . $req->uri . "\n";
396             }
397             }
398              
399             sub _read_file {
400 0     0     my ( $self, $kernel, $feed ) = @_[OBJECT, KERNEL, ARG0];
401            
402 0           my $filename = $feed->tmpdir.$feed->name.".atom";
403 0           my $fh = gensym();
404 0           open($fh,$filename);
405              
406 0           my $wheel = POE::Wheel::ReadWrite->new(
407             Handle => $fh,
408             Driver => POE::Driver::SysRW->new(),
409             Filter => POE::Filter::Stream->new(),
410             InputEvent => '_file_read_input',
411             FlushedEvent => '_file_read_flush',
412             ErrorEvent => '_file_read_error',
413             );
414 0           my $wid = $wheel->ID;
415 0 0         warn "started wheel id $wid" if ($self->{debug});
416              
417 0           $self->{wheels}->{$wid} = {
418             name => $feed->name,
419             obj => $wheel,
420             file => $filename,
421             callback => $_[ARG1]
422             };
423              
424 0           undef;
425             }
426              
427             sub _file_read_input {
428 0     0     my ($self, $wid) = @_[OBJECT, ARG1];
429 0           my $f = $self->{wheels}->{$wid};
430 0 0         warn "[$f->{name}][read] input on wheel $wid : $f->{file}" if ($self->{debug});
431 0           $f->{in} .= $_[ARG0];
432             }
433              
434             sub _file_read_flush {
435 0     0     my ($self, $wid) = @_[OBJECT, ARG0];
436 0 0         return unless($self->{debug});
437 0           my $f = $self->{wheels}->{$wid};
438 0           warn "[$f->{name}][read] file flushed";
439             }
440              
441             sub _file_read_error {
442 0     0     my ($self, $name, $num, $desc, $wid) = @_[ OBJECT, ARG0 .. ARG3 ];
443 0           my $f = delete $self->{wheels}->{$wid};
444 0 0         warn "[$f->{name}][read] file $name error $num : $desc on wheel $wid" if ($self->{debug});
445 0 0         if ($f->{callback}) {
446 0           delete $f->{obj};
447 0           $f->{error} = $num;
448 0           $f->{callback}->( $f );
449             }
450 0           undef;
451             }
452              
453             sub _write_file {
454 0     0     my ( $self, $kernel, $feed, $contents ) = @_[OBJECT, KERNEL, ARG0, ARG1];
455            
456 0           my $filename = $feed->tmpdir.$feed->name.".atom";
457 0           my $fh = gensym();
458 0           open($fh,">$filename");
459              
460 0           my $wheel = POE::Wheel::ReadWrite->new(
461             Handle => $fh,
462             Driver => POE::Driver::SysRW->new(),
463             Filter => POE::Filter::Stream->new(),
464             FlushedEvent => '_file_write_flush',
465             ErrorEvent => '_file_write_error',
466             );
467              
468 0           $self->{wheels}->{$wheel->ID} = {
469             name => $feed->name,
470             obj => $wheel,
471             file => $filename,
472             callback => $_[ARG2],
473             };
474              
475 0           $wheel->put( $contents );
476              
477 0           undef;
478             }
479              
480             sub _file_write_flush {
481 0     0     my ( $self, $wid ) = @_[OBJECT, ARG0];
482 0           my $f = delete $self->{wheels}->{$wid};
483 0 0         warn "[$f->{name}][write] flush on $f->{file}" if ($self->{debug});
484 0 0         if ($f->{callback}) {
485 0           delete $f->{obj};
486 0           $f->{callback}->( $f );
487             }
488 0           undef;
489             }
490              
491             sub _file_write_error {
492 0     0     my ($self, $name, $num, $desc, $wid) = @_[ OBJECT, ARG0 .. ARG3 ];
493 0           my $f = delete $self->{wheels}->{$wid};
494 0 0         warn "[$f->{name}][write] file $name $num on $f->{file} : $f->{file}" if ($self->{debug});
495 0 0         if ($f->{callback}) {
496 0           delete $f->{obj};
497 0           $f->{error} = $num;
498 0           $f->{callback}->( $f );
499             }
500 0           undef;
501             }
502              
503             =head1 AUTHOR
504              
505             David Davis, aka Xantus, C<< >>
506              
507             =head1 BUGS
508              
509             Please report any bugs or feature requests to
510             C, or through the web
511             interface at
512             L.
513              
514             =head1 SUPPORT
515              
516             You can find documentation for this module with the perldoc command.
517              
518             perldoc POE::Component::AtomAggregator
519              
520             You can also look for information at:
521              
522             =over 4
523              
524             =item * AnnoCPAN: Annotated CPAN documentation
525              
526             L
527              
528             =item * CPAN Ratings
529              
530             L
531              
532             =item * RT: CPAN's request tracker
533              
534             L
535              
536             =item * Search CPAN
537              
538             L
539              
540             =back
541              
542             =head1 NOTES
543              
544             All XML::Atom::Feed objects mentioned in this doc are actually
545             POE::Component::AtomAggregator::Feed objects that have extra accessors and
546             methods to add late_breaking_news functionality and non blocking file IO.
547             You can use the object as if it were a XML::Atom::Feed object.
548              
549             =head1 ACKNOWLEDGEMENTS
550              
551             A big thank you to Jeff Bisbee for POE::Component::RSSAggregator
552              
553             This module entirely based off his work, with changes to use XML::Atom
554             instead of XML::RSS
555              
556             Also a big thanks to miyagawa for XML::Atom::Feed.
557              
558             =head1 COPYRIGHT & LICENSE
559              
560             Copyright 2006 David Davis, aka Xantus
561              
562             All rights reserved.
563              
564             This program is free software; you can redistribute it and/or modify it
565             under the same terms as Perl itself.
566              
567             =head1 SEE ALSO
568              
569             L, L
570              
571             =cut
572              
573             1;
574              
575             # TODO move this?
576             package POE::Component::AtomAggregator::Feed;
577              
578 1     1   350 use XML::Atom::Feed;
  0            
  0            
579             use Carp qw( croak );
580             use POE;
581              
582             our $AUTOLOAD;
583              
584             our %accessors = map { $_ => 1 } qw(
585             url
586             name
587             delay
588             tmpdir
589             ignore_first
590             );
591              
592             # autoload that calls methods on XML::Atom::Feed
593             sub AUTOLOAD {
594             my $self = shift;
595             my $type = ref($self)
596             or croak "$self is not an object";
597              
598             my $name = $AUTOLOAD;
599             $name =~ s/.*://;
600              
601             if ($accessors{$name}) {
602             return $self->{$name};
603             }
604              
605             if ($self->{obj} && $self->{obj}->can( $name ) ) {
606             no strict 'refs';
607             return $self->{obj}->$name(@_);
608             }
609            
610              
611             croak "method not found `$name' in class $type";
612             }
613              
614             sub new {
615             my $class = shift;
616             my $obj = shift;
617             $obj->{entries} = [];
618             my $self = bless($obj, $class);
619              
620             $self;
621             }
622              
623             sub late_breaking_news {
624             @{shift->{entries}};
625             }
626              
627             sub parse {
628             my ( $self, $content, $no_write ) = @_;
629            
630             return 0 if ($self->{pending_open});
631            
632             # using the last obj $self->{obj} diff the feeds
633             my $feed = XML::Atom::Feed->new( \$content );
634            
635             # TODO better diff detection
636             my %entries;
637             if ( $self->{obj} ) {
638             %entries = map { $_->link->href => 1 } $self->entries;
639             }
640              
641             my @diff = grep { !exists( $entries{ $_->link->href } ) } $feed->entries;
642              
643             if ( $self->ignore_first && !$self->{obj} ) {
644             $self->{obj} = $feed;
645             return 0;
646             }
647            
648             $self->{obj} = $feed;
649             $self->{entries} = \@diff;
650            
651             unless ($no_write) {
652             if ( @diff ) {
653             $poe_kernel->post( $self->{_parent_sid} => _write_file => $self => $content => sub {
654             my $f = shift;
655             warn "[$f->{name}] finished writing $f->{file}" if ($self->{debug});
656             } );
657             }
658             }
659              
660             return @diff ? scalar(@diff) : 0;
661             }
662              
663             1;
664