File Coverage

blib/lib/Padre/Plugin/Swarm/Transport/Local.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Padre::Plugin::Swarm::Transport::Local;
2 1     1   22635 use strict;
  1         3  
  1         37  
3 1     1   6 use warnings;
  1         2  
  1         30  
4 1     1   6 use Carp 'confess';
  1         2  
  1         74  
5 1     1   1670 use Padre::Logger;
  0            
  0            
6             use Data::Dumper;
7             use base qw( Object::Event );
8             use AnyEvent::Handle;
9             use IO::Socket::Multicast;
10             use Padre::Swarm::Message;
11             use JSON;
12             our $VERSION = '0.2';
13              
14             =pod
15              
16             =head1 NAME
17              
18             Padre::Plugin::Swarm::Transport::Local - Multicast swarm message bus
19              
20             =head1 DESCRIPTION
21              
22             =head1 SYNOPSIS
23              
24             my $t = Padre::Plugin::Swarm::Transport::Local->new();
25             $t->reg_cb('connect' , sub { printf "Transport %s connected", shift } );
26             $t->reg_cb('recv', \&incoming_message );
27             $t->reg_cb('disconnect', sub { warn "Disconnected" } );
28            
29             $t->enable;
30              
31             =cut
32              
33             sub new {
34             my $class = shift;
35             my $self = $class->SUPER::new(@_);
36             $self->{token} = $$.rand();
37             return $self;
38             }
39              
40             sub enable {
41             my $self = shift;
42            
43             my $m = IO::Socket::Multicast->new(
44             LocalPort => 12000,
45             ReuseAddr => 1,
46             ) or die $!;
47            
48             $m->mcast_add('239.255.255.1'); #should have the interface
49             $m->mcast_loopback( 1 );
50            
51             $self->{m} = $m;
52             $self->{io} = AnyEvent->io(
53             fh => $m,
54             poll => 'r',
55             cb => sub {
56             $self->event('readable')
57             }
58             );
59             $self->reg_cb( 'readable' , \&readable );
60             $self->reg_cb('disconnect', \&disconnect );
61             $self->event('connect',$self->{token} );
62            
63             return;
64             }
65              
66             sub send {
67             my $self = shift;
68             my $message = shift;
69            
70             if ( threads::shared::is_shared( $message ) ) {
71             TRACE( "SEND A SHARED REFERENCE ?!?!?! - " . Dumper $message );
72             confess "$message , is a shared value";
73             }
74              
75             $message->{token} = $self->{token};
76             my $data = eval { $self->_marshal->encode($message) };
77             if ($data) {
78             $self->{m}->mcast_send(
79             $data, '239.255.255.1:12000'
80             );
81             }
82             }
83              
84             sub readable {
85             my $self = shift;
86             my $data;
87             unless ( $self->{m} ) {
88             TRACE( 'Multicast handle has gone away!' );
89             return;
90             }
91             $self->{m}->recv($data,65535);
92             my $message = eval{ $self->_marshal->decode($data) };
93             if ( $message ) {
94             $self->event('recv', $message);
95             }
96            
97             }
98              
99              
100             sub disconnect {
101             my $self = shift;
102             if ( $self->{io} ) {
103             delete $self->{io};
104             my $m = delete $self->{m};
105             $m->mcast_drop('239.255.255.1');
106             }
107             $self->unreg_me;
108             }
109              
110             sub _marshal {
111             JSON->new
112             ->allow_blessed
113             ->convert_blessed
114             ->utf8
115             ->filter_json_object(\&synthetic_class );
116             }
117              
118              
119             sub synthetic_class {
120             my $var = shift ;
121             if ( exists $var->{__origin_class} ) {
122             my $stub = $var->{__origin_class};
123             my $msg_class = 'Padre::Swarm::Message::' . $stub;
124             my $instance = bless $var , $msg_class;
125             return $instance;
126             } else {
127             return bless $var , 'Padre::Swarm::Message';
128             }
129             };
130              
131              
132             1;