File Coverage

blib/lib/POE/Component/Spread.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package POE::Component::Spread;
2              
3 2     2   386659 use strict;
  2         5  
  2         72  
4 2     2   9 use vars qw($VERSION @ISA @EXPORT);
  2         4  
  2         117  
5 2     2   10 use Exporter;
  2         9  
  2         134  
6              
7             $VERSION = "0.02";
8             @ISA = qw(Exporter);
9             @EXPORT = qw(REGULAR_MESS);
10              
11 2     2   16962 use Data::Dumper;
  2         26663  
  2         186  
12 2     2   1194 use POE::Driver::Spread;
  0            
  0            
13             use POE::Filter::Spread;
14             use Spread qw(:MESS);
15             use POE qw( Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Line Filter::Stream );
16              
17             sub new {
18             my( $package, $alias ) = splice @_, 0, 2;
19              
20             return POE::Session->create(
21             package_states => [
22             $package => [ qw(_get_spread _error _start publish subscribe connect disconnect) ]
23             ],
24             args => [ $alias, @_ ]
25             );
26             }
27              
28             sub _start {
29             my ($kernel, $session, $heap, $alias) = @_[KERNEL, SESSION, HEAP, ARG0];
30             $kernel->alias_set($alias);
31             }
32              
33             sub connect {
34             my $k_heap = $_[HEAP]; # kernel's heap
35             my $sender = $_[SENDER];
36             my $s_name = $_[ARG0];
37             my $p_name = $_[ARG1];
38             my $heap = $sender->get_heap();
39              
40             # Spread doesn't like hostnames without ports
41             unless ($s_name =~ /^\d+$/ or $s_name =~ /@/) {
42             $s_name = '4803@' . $s_name
43             }
44              
45             my ($m, $pg) = Spread::connect( { private_name => $p_name, spread_name => $s_name } );
46             print "m is $m, pg=$pg\n";
47              
48             if (!defined($m)) {
49             $poe_kernel->post( $sender => '_error' => 'connect' );
50             }
51              
52             $k_heap->{groups}->{$pg}->{$sender} = 'self';
53             $heap->{private_name} = $pg;
54              
55             # create a filehandle from the fileno we get back from Spread::connect
56             open $heap->{filehandle}, "<&=$m";
57              
58             $heap->{wheel} = POE::Wheel::ReadWrite->new(
59             Handle => $heap->{filehandle},
60             Driver => POE::Driver::Spread->new(mbox => $m),
61             Filter => POE::Filter::Spread->new(),
62              
63             InputEvent => '_get_spread',
64             ErrorEvent => '_error'
65             );
66             $heap->{spread} = $m;
67             }
68              
69             sub disconnect {
70             my $sender = $_[SENDER];
71             my $heap = $sender->get_heap();
72              
73             $heap->{wheel}->shutdown_input();
74             undef $heap->{wheel};
75             }
76              
77             sub _error {
78             my ($kernel, $session, $heap, $data, $sender, $id) = @_[KERNEL, SESSION, HEAP, ARG0, SENDER, ARG3];
79             my $r_heap = $sender->get_heap();
80             # print "ERROR: ID=$id session:[$session, $heap], sender:[$sender, $r_heap], kernel:$poe_kernel\n";
81             # BLEH
82             }
83              
84             sub _get_spread {
85             my ($kernel, $session, $heap, $data) = @_[KERNEL, SESSION, HEAP, ARG0];
86             my @moo = @$data;
87             my ($type, $sender, $groups, $mess, $endian, $message) = @{$moo[0]};
88              
89             if (!defined($type)) {
90             $kernel->post( $session => '_error' => 'null packet' );
91             return undef;
92             }
93              
94             foreach my $g (@$groups) {
95             foreach my $r (keys %{$heap->{groups}->{$g}}) {
96             my $event = $heap->{groups}->{$g}->{$r};
97             $event .= ($type & REGULAR_MESS) ? "_regular" : "_admin";
98             $kernel->post( $r => $event => [$sender, $message, $type, $groups] );
99             }
100             }
101             }
102              
103             sub publish {
104             my $groups = $_[ARG0];
105             my $message = $_[ARG1];
106             my $heap = $_[HEAP];
107             my $sender = $_[SENDER];
108             my $r_heap = $sender->get_heap();
109              
110             Spread::multicast($r_heap->{spread}, RELIABLE_MESS, $groups, 0, $message);
111             }
112              
113             sub subscribe {
114             my $groups = $_[ARG0];
115             my $heap = $_[HEAP];
116             my $session = $_[SESSION];
117             my $sender = $_[SENDER];
118             my $event = $_[ARG1] || $groups;
119             my $r_heap = $sender->get_heap();
120              
121             $heap->{groups}->{$groups}->{$sender} = $event;
122              
123             Spread::join($r_heap->{spread}, $groups);
124             }
125              
126             $VERSION;
127              
128             __END__