File Coverage

blib/lib/Padre/Plugin/Swarm/Transport/Global.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Padre::Plugin::Swarm::Transport::Global;
2 1     1   37620 use strict;
  1         486  
  1         56  
3 1     1   6 use warnings;
  1         3  
  1         78  
4 1     1   6 use Carp 'confess';
  1         1  
  1         74  
5 1     1   3067 use Padre::Logger;
  0            
  0            
6             use Data::Dumper;
7             use base qw( Object::Event );
8             use Padre::Swarm::Message;
9             use AnyEvent::Socket;
10             use AnyEvent::Handle;
11             use Scalar::Util 'blessed';
12             use JSON;
13              
14             our $VERSION = '0.2';
15              
16             sub new {
17             my $class = shift;
18             my $self = $class->SUPER::new(@_);
19             $self->reg_cb( 'start_session' => \&start_session );
20             return $self;
21             }
22              
23             sub enable {
24             my $self = shift;
25             my $g = tcp_connect $self->{host} , $self->{port},
26             sub { $self->event( 'start_session', shift) };
27             $self->{g} = $g;
28             }
29              
30             sub start_session {
31             my ($self,$fh) = @_;
32             unless ($fh) {
33             $self->event('disconnect','Connection failed ' . $!);
34             return;
35             }
36             my $h = AnyEvent::Handle->new(
37             fh => $fh,
38             json => $self->_marshal,
39             on_eof => sub { $self->event('disconnect', shift ) },
40             );
41            
42            
43             # now we register our own disconnect handler for teardown;
44             $self->reg_cb('disconnect', \&disconnect );
45            
46             $self->{h} = $h;
47             $h->push_write( json => { trustme=>$$.rand() } );
48             $h->push_read( json => sub { $self->event( 'see_auth' , @_ ) } );
49             $self->reg_cb( 'see_auth' , \&see_auth );
50            
51             }
52              
53              
54             sub disconnect {
55             my $self = shift;
56              
57             if ($self->{h}) {
58             $self->{h}->destroy;
59             delete $self->{h};
60             }
61             delete $self->{chirp};
62            
63             $self->unreg_me;
64            
65             }
66              
67              
68             sub see_auth {
69             my $self = shift;
70             my $handle = shift;
71             my $message = shift;
72             $self->unreg_cb('start_session');
73             $self->{h} = $handle;
74             $self->{token} = $message->{token};
75             if ( $message->{session} eq 'authorized' ) {
76             $self->{h}->on_read( sub {
77             shift->push_read( json => sub { $self->event('recv',$_[1]) } );
78             }
79             );
80             $self->event('connect'=>$self->{token} );
81             # this is hideous but works for me
82             # timer pushes some data to the socket every so often to convince
83             # firewalls "I really DO want this connection - OK!"
84             my $chirp = AnyEvent->timer(
85             after => 60,
86             interval => 300,
87             cb => sub { $self->send( {type=>'noop'}) }
88             );
89             $self->{chirp} = $chirp;
90             }
91             else {
92             $self->{h}->destroy;
93             delete $self->{h};
94             $self->event('disconnect','Authorization failed');
95            
96             }
97             }
98              
99             sub send {
100             my $self = shift;
101             my $message = shift;
102             if ( threads::shared::is_shared( $message ) ) {
103             TRACE( "SEND A SHARED REFERENCE ?!?!?! - " . Dumper $message );
104             confess "$message , is a shared value";
105             }
106             $message->{token} = $self->{token};
107             $self->{h}->push_write( json => $message );
108             # implement our own loopback ?
109             # nasty but fake what the deserializing marshal _would_ do.
110             unless ( blessed $message ) {
111             bless $message, 'Padre::Swarm::Message';
112             }
113             $self->event('recv', $message );
114            
115             }
116              
117             sub _marshal {
118             JSON->new
119             ->allow_blessed
120             ->convert_blessed
121             ->utf8
122             ->filter_json_object(\&synthetic_class );
123             }
124              
125              
126             sub synthetic_class {
127             my $var = shift ;
128             if ( exists $var->{__origin_class} ) {
129             my $stub = $var->{__origin_class};
130             my $msg_class = 'Padre::Swarm::Message::' . $stub;
131             my $instance = bless $var , $msg_class;
132             return $instance;
133             } else {
134             return bless $var , 'Padre::Swarm::Message';
135             }
136             };
137              
138              
139             1;