File Coverage

blib/lib/POE/Component/Server/BigBrother.pm
Criterion Covered Total %
statement 39 162 24.0
branch 0 44 0.0
condition 0 12 0.0
subroutine 13 31 41.9
pod 3 3 100.0
total 55 252 21.8


line stmt bran cond sub pod time code
1             # -*- encoding: utf-8; mode: cperl -*-
2              
3             package POE::Component::Server::BigBrother;
4              
5 1     1   21511 use strict;
  1         1  
  1         37  
6 1     1   5 use warnings;
  1         2  
  1         24  
7 1     1   5 use Carp;
  1         5  
  1         81  
8              
9             #sub POE::Kernel::TRACE_REFCNT () { 1 }
10             #sub POE::Kernel::ASSERT_DEFAULT () { 1 }
11             #sub POE::Kernel::TRACE_DEFAULT () { 1 }
12             #sub POE::Kernel::TRACE_SIGNALS () { 1 }
13             #sub POE::Kernel::ASSERT_EVENTS () { 1 }
14              
15 1     1   4 use base qw(POE::Component::Pluggable);
  1         2  
  1         903  
16 1     1   13408 use POE;
  1         50716  
  1         6  
17 1     1   100328 use POE::Component::Pluggable::Constants qw(:ALL);
  1         3  
  1         157  
18 1     1   1193 use POE::Component::Server::TCP;
  1         28062  
  1         43  
19 1     1   1140 use POE::Filter::BigBrother;
  1         929  
  1         29  
20 1     1   815 use POE::Filter::Stream;
  1         356  
  1         31  
21              
22 1     1   809 use Log::Report syntax => 'SHORT';
  1         117656  
  1         8  
23              
24             # use Smart::Comments;
25              
26 1     1   319 use vars qw($VERSION);
  1         4  
  1         54  
27              
28             $VERSION='0.08';
29              
30             #
31             # constants
32             #
33 1     1   8 use constant DATA_TRUNCATED_MESSAGE => "... DATA TRUNCATED ...";
  1         3  
  1         88  
34 1     1   6 use constant DATA_TRUNCATED_MESSAGE_LENGTH => length(DATA_TRUNCATED_MESSAGE);
  1         2  
  1         2331  
35              
36             sub spawn {
37 0     0 1   my $package = shift;
38 0           my %opts = @_;
39             $opts{ lc $_ } = delete $opts{$_}
40 0           for keys %opts; # convert opts to lower case
41 0           my $options = delete $opts{options};
42 0           my $self = bless \%opts, $package;
43              
44 0           $self->_pluggable_init(prefix => 'bb_', types => [ 'MESSAGE', 'EVENT' ]);
45              
46             # default values
47 0   0       $self->{time_out} ||= 30; # default time_out
48 0   0       $self->{bind_port} ||= 1984; # default bind port
49 0   0       $self->{max_msg_size} ||= 16384; # default max message size
50              
51 0 0         $self->{session_id} = POE::Session->create(
52             object_states => [
53             $self => {
54             shutdown => '_on_shutdown',
55             register => '_on_register',
56             unregister => '_on_unregister'
57             },
58             $self => [
59             qw (
60             _start
61             _on_dispatch
62             )
63             ],
64             ],
65             heap => $self,
66             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
67             )->ID();
68 0           return $self;
69             }
70              
71             sub shutdown {
72 0     0 1   my $self = shift;
73 0           $poe_kernel->post( $self->{session_id}, 'shutdown' );
74             }
75              
76             sub _pluggable_event {
77 0     0     my ($self) = @_;
78             ### _pluggable_event: @_
79 0           $poe_kernel->post($self->{session_id}, '_dispatch');
80             }
81              
82             sub _on_dispatch {
83 0     0     my ($kernel,$self,$event,@args) = @_[KERNEL,OBJECT,ARG0,ARG1..$#_];
84 0           $self->_dispatch( $event, @args );
85 0           return;
86             }
87              
88             sub _dispatch {
89 0     0     my ( $self, $event, @args ) = @_;
90             ## _dispatch event: $event
91 0 0         return 1 if $self->_pluggable_process('MESSAGE', $event, \(@args)) == PLUGIN_EAT_ALL;
92              
93 0           my %sessions;
94              
95             # concatenate all session wich correspond to the event
96 0           foreach (
97 0           values %{ $self->{events}->{ $self->{_pluggable_prefix} . 'all' } },
  0            
98             values %{ $self->{events}->{$event} } ) {
99 0           $sessions{$_} = $_;
100             }
101 0           $poe_kernel->post( $_, $event, @args ) for values %sessions;
102             }
103              
104             sub _start {
105 0     0     my ( $kernel, $self, $sender ) = @_[ KERNEL, OBJECT, SENDER ];
106 0           $self->{session_id} = $_[SESSION]->ID();
107 0 0         if ( $self->{alias} ) {
108 0           $kernel->alias_set( $self->{alias} );
109             } else {
110 0           $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
111             }
112              
113             # Adapt to POE::Component::Server::TCP v1.020 new args
114 0 0         my $poco_tcp_args
115             = POE::Component::Server::TCP->VERSION > 1.007 ? "ClientArgs" : "Args";
116              
117             ## Create a tcp server that receives BigBrother messages. It
118             ## will be referred to by the name "server_tcp" when necessary.
119             ## It listen on port 1984. It uses POE::Filter::Block to parse
120             ## input and format output.
121 0 0         $self->{listener} =
122             POE::Component::Server::TCP->new(
123             ( $self->{alias} ? ( Alias => $self->{alias} . '_tcp_listener' ) : () ),
124             $poco_tcp_args => [ self => $self ],
125             Address => $self->{bind_addr},
126             Port => $self->{bind_port},
127             Concurrency => -1,
128             Error => \&_on_tcp_server_error,
129             ClientConnected => \&_on_client_connect,
130             ClientDisconnected => \&_on_client_disconnect,
131             ClientFilter => POE::Filter::Stream->new(),
132             ClientInput => \&_on_client_input,
133             InlineStates => { '_conn_alarm' => \&_conn_alarm, },
134             );
135              
136 0           return;
137             }
138              
139             sub _conn_alarm {
140             ### _conn_alarm
141 0     0     $_[KERNEL]->yield('shutdown');
142             }
143              
144             # Register some event(s)
145             sub _on_register {
146 0     0     my ($kernel, $session, $sender, $self, @events) = @_[KERNEL, SESSION, SENDER, OBJECT, ARG0 .. $#_ ];
147 0 0         croak 'Not enough arguments' unless @events;
148              
149 0           my $sender_id = $sender->ID();
150              
151 0           foreach my $event (@events) {
152 0 0         $event = $self->{_pluggable_prefix} . $event unless $event =~ /^_/;
153 0           $self->{events}->{$event}->{$sender_id} = $sender_id;
154 0           $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
155 0 0 0       unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
156             # One count for every event
157 0           $kernel->refcount_increment($sender_id, __PACKAGE__);
158             }
159 0           $kernel->yield(_dispatch => $self->{_pluggable_prefix} . 'registered', $sender_id);
160             }
161 0           return;
162             }
163              
164             sub _on_unregister {
165 0     0     my ( $kernel, $self, $session, $sender, @events ) =
166             @_[ KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_ ];
167              
168 0 0         die "Not enough arguments for unregister event" unless @events;
169              
170 0           my $sender_id = $sender->ID();
171 0           foreach (@events) {
172 0           delete $self->{events}->{$_}->{$sender_id};
173 0 0         if ( --$self->{sessions}->{$sender_id}->{refcnt} <= 0 ) {
174 0           delete $self->{sessions}->{$sender_id};
175 0 0         unless ( $session == $sender ) {
176 0           $kernel->refcount_decrement( $sender_id, __PACKAGE__ );
177             }
178             }
179             }
180 0           return;
181             }
182              
183             sub _unregister_sessions {
184 0     0     my $self = shift;
185 0           foreach my $session_id ( keys %{ $self->{sessions} } ) {
  0            
186 0           my $refcnt = $self->{sessions}->{$session_id}->{refcnt};
187 0           while ( $refcnt --> 0 ) {
188 0           $poe_kernel->refcount_decrement($session_id, __PACKAGE__);
189             }
190 0           delete $self->{sessions}->{$session_id};
191             }
192             }
193              
194             sub _on_shutdown {
195 0     0     my ($kernel,$self) = @_[KERNEL,OBJECT];
196             ### Shutting down BigBrother Gateway
197 0           $self->_unregister_sessions();
198 0           $kernel->alarm_remove_all();
199 0           $kernel->alias_remove( $_ ) for $kernel->alias_list();
200 0 0         $kernel->post( $self->{listener}, 'shutdown' ) if $self->{listener};
201 0           $self->_pluggable_destroy();
202             ### Waiting all clients to disconnect
203 0           return;
204             }
205              
206             sub _on_tcp_server_error {
207 0     0     my ($syscall_name, $error_number, $error_string) = @_[ARG0, ARG1, ARG2];
208 0           croak "BigBrother Gateway: $syscall_name error because $error_string\n";
209             }
210              
211             sub _on_client_connect {
212 0     0     my ($kernel, $sender, $heap) = @_ [ KERNEL, SESSION, HEAP ];
213 0           my %args;
214 0 0         if ( ref $_[ARG0] eq 'HASH' ) {
    0          
215 0           %args = %{ $_[ARG0] };
  0            
216             } elsif ( ref $_[ARG0] eq 'ARRAY' ) {
217 0           %args = @{ $_[ARG0] };
  0            
218             } else {
219 0           %args = @_[ARG0..$#_];
220             }
221 0           my $self = delete $args{self};
222 0           $heap->{bb_server} = $self; # store self object on the client session as server
223 0           $heap->{buffer} = '';
224 0           _delay_timeout($kernel, $self);
225             }
226              
227             sub _on_client_input {
228             # Accumulate datas
229 0     0     $_[HEAP]->{buffer} .= $_[ARG0];
230             }
231              
232             sub _decode_bb_message {
233 0     0     my ($self, $input) = @_;
234 0           my $message = undef;
235 0           my $input_length = length($input);
236              
237 0 0         if ( $input_length > $self->{max_msg_size} ) {
238 0           print STDERR "Truncated too long BigBrother message ($input_length > ",
239             $self->{max_msg_size} . "):\n", substr( $input, 0, 80 ), "\n";
240 0           my $pos = $self->{max_msg_size} - DATA_TRUNCATED_MESSAGE_LENGTH;
241 0           substr( $input, $pos, $input_length - $pos, DATA_TRUNCATED_MESSAGE );
242             }
243              
244 0 0         if (
    0          
245             $input =~ m/^
246             ((?:(?:(?:dis|en)abl|pag)e|status)) # the command ($1)
247             (\+\d+)? # the offset ($2)
248             \s+ # some spaces
249             (\S+?)\.(\S+) # server.probe ($3, $4)
250             \s+ # some spaces
251             (.*)$ # last args ($5)
252             /sx
253             ) {
254 0           my $command = lc($1);
255 0           $message->{command} = $command;
256 0 0         $message->{offset} = $2 if defined $2;
257 0           $message->{host_name} = $3;
258 0           $message->{probe} = $4;
259 0           my $args = $5;
260              
261 0           $message->{host_name} =~ tr/,/./; # Translate server fqdn
262              
263 0 0         if ( $command eq 'enable' ) { # Enable command
264 0           $message->{data} = $args;
265             } else {
266 0           my ( $arg1, $arg2 ) = split( /\s+/, $args, 2 );
267 0 0 0       if ( $command eq 'status' or $command eq 'page' ) {
268 0           $message->{color} = $arg1;
269             }
270             else { # Disable command
271 0           $message->{period} = $arg1;
272             }
273 0           $message->{data} = $arg2;
274             }
275             } elsif ($input =~ m/^(?:event\s+)(.+)$/si) {
276 0           $message = { command => 'event', params => $1 };
277             } else {
278 0           warning "Unknown BB message: ".substr($input,0,80);
279             }
280              
281 0           return $message;
282             }
283              
284             sub _on_client_disconnect {
285 0     0     my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
286              
287             # Remove all alarms
288 0           $kernel->alarm_remove_all();
289              
290             # Check if we have receive some datas
291 0 0         if (length $heap->{buffer}) {
292             # Create a new filter to parse raw BB messages
293 0           my $filter = POE::Filter::BigBrother->new();
294 0           my $bb_server = $heap->{bb_server};
295             # Decode any BB messages
296 0           foreach my $cooked_input ( @{ $filter->get( [ $heap->{buffer} ] ) } ) {
  0            
297 0 0         if ( my $message = $bb_server->_decode_bb_message($cooked_input) ) {
298 0           $bb_server->_dispatch(
299             $bb_server->{_pluggable_prefix} . $message->{command},
300             $message, $bb_server );
301             }
302             }
303             }
304             }
305              
306             sub _delay_timeout {
307 0     0     my ($kernel,$self) = @_;
308 0           $kernel->delay( '_conn_alarm', $self->{time_out} );
309             }
310              
311             sub session_id {
312 0     0 1   my ($self) = @_;
313 0           return $self->{session_id};
314             }
315              
316             1; # End of POE::Component::Server::BigBrother
317             __END__