File Coverage

blib/lib/POE/Component/Client/Asterisk/Manager.pm
Criterion Covered Total %
statement 21 207 10.1
branch 0 104 0.0
condition 0 30 0.0
subroutine 7 29 24.1
pod 1 3 33.3
total 29 373 7.7


line stmt bran cond sub pod time code
1             package POE::Component::Client::Asterisk::Manager;
2              
3             ######################################################################
4             ### POE::Component::Client::Asterisk::Manager
5             ### David Davis (xantus@cpan.org)
6             ###
7             ### Copyright (c) 2003-2005 David Davis and Teknikill. All Rights
8             ### Reserved. This module is free software; you can redistribute it
9             ### and/or modify it under the same terms as Perl itself.
10             ######################################################################
11              
12 1     1   319404 use strict;
  1         4  
  1         48  
13 1     1   5 use warnings;
  1         2  
  1         57  
14              
15             our $VERSION = '0.08';
16              
17 1     1   7 use Carp qw(croak);
  1         7  
  1         83  
18 1     1   7 use POE qw( Component::Client::TCP );
  1         2  
  1         10  
19 1     1   547 use Digest::MD5;
  1         2  
  1         2219  
20              
21 0     0 0   sub DEBUG { 0 }
22              
23             sub new {
24 0     0 1   my $package = shift;
25 0 0         croak "$package requires an even number of parameters" if @_ % 2;
26 0           my %params = @_;
27 0           my $alias = $params{'Alias'};
28              
29 0 0 0       $alias = 'asterisk_client' unless defined($alias) and length($alias);
30              
31 0   0       my $listen_port = $params{listen_port} || 5038;
32              
33 0           POE::Session->create(
34             #options => {trace=>1},
35             args => [ %params ],
36             package_states => [
37             'POE::Component::Client::Asterisk::Manager' => {
38             _start => '_start',
39             _stop => '_stop',
40             signals => 'signals',
41             }
42             ],
43             inline_states => $params{inline_states},
44             # {
45             # _default => sub {
46             # print STDERR "$_[STATE] called\n";
47             # },
48             # },
49             );
50              
51 0           return 1;
52             }
53              
54             sub _start {
55 0     0     my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
56 0           my %params = splice(@_,ARG0);
57              
58 0 0         if (ref($params{Options}) eq 'HASH') {
59 0           $session->option( %{$params{Options}} );
  0            
60             }
61              
62 0   0       $params{reconnect_time} = $params{reconnect_time} || 5;
63 0   0       $params{requeue_posts} = $params{requeue_posts} || undef;
64              
65 0           $kernel->alias_set($params{Alias});
66              
67             # watch for SIGINT
68             # $kernel->sig('INT', 'signals');
69              
70             $heap->{client} = POE::Component::Client::TCP->new(
71             RemoteAddress => $params{RemoteHost},
72             RemotePort => $params{RemotePort},
73             # no longer a seperate package - see below
74             Filter => "POE::Filter::Asterisk::Manager",
75             Alias => "$params{Alias}_client",
76             Args => [ \%params ],
77             Started => sub {
78 0     0     $_[HEAP]->{params} = $_[ARG0];
79             },
80             Disconnected => sub {
81 0     0     $_[KERNEL]->delay(reconnect => $_[HEAP]->{params}->{reconnect_time});
82             },
83             Connected => sub {
84 0     0     my $heap = $_[HEAP];
85 0 0         DEBUG && print STDERR sprintf("connected to %s:%s\n",$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort});
86 0           $heap->{_connected} = 0;
87 0           $heap->{_logged_in} = 0;
88 0           $heap->{_auth_stage} = 0;
89 0           $_[KERNEL]->delay( recv_timeout => 5 );
90              
91 0 0         if ($heap->{params}->{Astmanproxy}) {
92             # For astmanproxy? Don't wait for a response
93 0           $heap->{_connected} = 1;
94 0           $kernel->call($_[SESSION] => login => splice(@_,ARG0));
95             }
96             },
97             ConnectError => sub {
98 0     0     my $heap = $_[HEAP];
99 0 0         DEBUG && print STDERR sprintf("could not connect to %s:%s, reconnecting in %s seconds...\n"
100             ,$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort},$heap->{params}->{reconnect_time});
101 0           $_[KERNEL]->delay(reconnect => $heap->{params}->{reconnect_time});
102             },
103             ServerInput => sub {
104 0     0     my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0];
105              
106 0 0         DEBUG && do {
107 0           require Data::Dumper;
108 0           print Data::Dumper->Dump([$input],['input']);
109             };
110              
111 0 0 0       if ($heap->{_logged_in} == 0 && $heap->{_connected} == 0) {
    0 0        
    0          
112 0           $kernel->delay( recv_timeout => 5 );
113 0 0         if ($input->{acm_version}) {
114 0           $heap->{_version} = $input->{acm_version};
115 0           $heap->{_connected} = 1;
116 0           $kernel->call($_[SESSION] => login => splice(@_,ARG0));
117             } else {
118 0           print STDERR "Invalid Protocol (wrong port?)\n";
119 0           $kernel->yield("shutdown");
120             }
121             } elsif ($heap->{_connected} == 1 && $heap->{_logged_in} == 0) {
122 0           $kernel->call($_[SESSION] => login => splice(@_,ARG0));
123             } elsif ($heap->{_logged_in} == 1) {
124 0           $kernel->call($_[SESSION] => callbacks => splice(@_,ARG0));
125             }
126             },
127              
128             InlineStates => {
129             _put => sub {
130 0     0     my $heap = $_[HEAP];
131 0 0         if ($heap->{server}) {
132 0           $heap->{server}->put($_[ARG0]);
133             } else {
134 0 0         if ($heap->{requeue_posts}) {
135 0           push(@{$heap->{queued}},$_[ARG0]);
  0            
136             } else {
137 0           print STDERR "cannot send when not connected! -ignored-\n";
138             }
139             }
140             },
141             login_complete => sub {
142 0     0     my ( $kernel, $heap ) = @_[KERNEL, HEAP];
143 0 0         DEBUG && print STDERR "logged in and ready to process events\n";
144             # call the _connected state
145 0           $kernel->yield("_connected" => splice(@_,ARG0));
146             },
147             recv_timeout => sub {
148 0     0     my ( $kernel, $heap ) = @_[KERNEL, HEAP];
149 0 0         unless ($heap->{_connected} == 1) {
150 0           print STDERR "Timeout waiting for response\n";
151 0           $heap->{_connected} = 0;
152 0           $heap->{_logged_in} = 0;
153 0           $kernel->yield("shutdown");
154             }
155             },
156             login => sub {
157 0     0     my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
158 0 0         if ($heap->{_logged_in} == 1) {
159             # shouldn't get here
160 0 0         DEBUG && print STDERR "Login called when already logged in\n";
161             #$kernel->yield(callbacks => splice(@_,ARG0));
162 0           return;
163             }
164 0 0         if ($heap->{_auth_stage} == 0) {
    0          
    0          
165 0           $heap->{server}->put({'Action' => 'Challenge', 'AuthType' => 'MD5'});
166 0           $heap->{_auth_stage} = 1;
167             } elsif ($heap->{_auth_stage} == 1) {
168 0 0 0       unless ($input->{Response} && lc($input->{Response}) eq 'success') {
169 0           print STDERR "AuthType MD5 may not be supported\n";
170 0           $kernel->yield("shutdown");
171 0           return;
172             }
173 0 0         if ($input->{Challenge}) {
174 0 0         if (! defined $heap->{params}->{Password}) {
175 0           print STDERR "No password provided\n";
176 0           $kernel->yield("shutdown");
177 0           return;
178             }
179              
180 0           my $digest = Digest::MD5::md5_hex("$input->{Challenge}$heap->{params}->{Password}");
181 0           $heap->{server}->put({'Action' => 'Login', 'AuthType' => 'MD5', 'Username' => $heap->{params}->{Username}, 'Key' => $digest });
182 0           $heap->{_auth_stage} = 2;
183             }
184             } elsif ($heap->{_auth_stage} == 2) {
185 0 0 0       if ($input->{Message} && lc($input->{Message}) eq 'authentication accepted') {
    0 0        
186 0           delete $heap->{_auth_stage};
187 0           $heap->{_logged_in} = 1;
188             # I remembered inline_states not working (above), so i put this in
189 0           foreach my $k (keys %{$heap->{params}->{inline_states}}) {
  0            
190 0           $kernel->state($k => $heap->{params}->{inline_states}{$k});
191             }
192 0 0         if (ref($heap->{queued}) eq 'ARRAY') {
193 0           foreach my $a (@{$heap->{queued}}) {
  0            
194 0           $heap->{server}->put($a);
195             }
196 0           delete $heap->{queued};
197             }
198 0           $kernel->yield(login_complete => splice(@_,ARG0));
199             } elsif ($input->{Message} && lc($input->{Message}) eq 'authentication failed') {
200 0           print STDERR "Authentication failed.\n";
201 0           $heap->{_connected} = 0;
202 0           $heap->{_logged_in} = 0;
203 0           $kernel->yield("shutdown");
204             }
205             }
206             },
207             callbacks => sub {
208 0     0     my ($kernel, $heap, $session, $input) = @_[KERNEL, HEAP, SESSION, ARG0];
209             # TODO this stuff needs some work
210 0 0         next unless (ref($input));
211 0           my $qual = 0;
212 0           foreach my $k (keys %{$heap->{params}->{CallBacks}}) {
  0            
213 0           my $match = 0;
214 0 0 0       if (ref($heap->{params}->{CallBacks}{$k}) eq 'HASH') {
    0          
215 0           foreach my $c (keys %{$heap->{params}->{CallBacks}{$k}}) {
  0            
216 0 0         last if ($match == 1);
217 0 0 0       if (exists($input->{$c}) && $heap->{params}->{CallBacks}{$k}{$c} eq $input->{$c}) {
218 0           $match = 2;
219 0           $qual++;
220             } else {
221 0           $match = 1;
222             }
223             }
224             # matched ALL of the callback (not 2 of them like it looks like)
225 0 0         if ($match == 2) {
226             # callback good
227 0 0         DEBUG && print STDERR "callback $k is good\n";
228 0           $kernel->yield($k => $input);
229             }
230             } elsif ($heap->{params}->{CallBacks}{$k} eq ':all' || $heap->{params}->{CallBacks}{$k} eq 'default') {
231 0           $kernel->yield($k => $input);
232             } else {
233 0           print STDERR "Incorrectly written callback $k\n";
234             }
235             }
236             # use the :all qualifier now
237             #if ($qual == 0) {
238             # $kernel->yield("default" => splice(@_,ARG0));
239             #}
240             },
241             },
242 0           );
243 0 0         DEBUG && print STDERR "Client started.\n";
244             }
245              
246             sub _stop {
247 0     0     $_[KERNEL]->yield("shutdown");
248 0 0         DEBUG && print STDERR "Client stopped.\n";
249             }
250              
251             # Handle incoming signals (INT)
252              
253             # TODO disconnect gracefully
254             sub signals {
255 0     0 0   my $signal_name = $_[ARG0];
256            
257             # DEBUG && print STDERR "Client caught SIG$signal_name\n";
258            
259             # do not handle the signal
260 0           return 0;
261             }
262              
263              
264             1;
265              
266             package POE::Filter::Asterisk::Manager;
267              
268 1     1   9 use strict;
  1         1  
  1         35  
269 1     1   5 use Carp qw(croak);
  1         2  
  1         1139  
270              
271 0     0     sub DEBUG { 0 };
272              
273             sub new {
274 0     0     my $type = shift;
275 0           my $self = {
276             buffer => '',
277             crlf => "\x0D\x0A",
278             };
279 0           bless $self, $type;
280 0           $self;
281             }
282              
283             sub get {
284 0     0     my ($self, $stream) = @_;
285              
286             # Accumulate data in a framing buffer.
287 0           $self->{buffer} .= join('', @$stream);
288              
289 0           my $many = [];
290 0           while (1) {
291 0           my $input = $self->get_one([]);
292 0 0         if ($input) {
293 0           push(@$many,@$input);
294             } else {
295 0           last;
296             }
297             }
298              
299 0           return $many;
300             }
301              
302             sub get_one_start {
303 0     0     my ($self, $stream) = @_;
304              
305 0 0         DEBUG && do {
306 0           my $temp = join '', @$stream;
307 0           $temp = unpack 'H*', $temp;
308 0           warn "got some raw data: $temp\n";
309             };
310              
311             # Accumulate data in a framing buffer.
312 0           $self->{buffer} .= join('', @$stream);
313             }
314              
315             sub get_one {
316 0     0     my $self = shift;
317              
318 0 0         return [] if ($self->{finish});
319              
320              
321 0 0         if ($self->{buffer} =~ s#^(?:Asterisk|Aefirion) Call Manager(?: Proxy)?/(\d+\.\d+\w*)$self->{crlf}##is) {
322 0           return [{ acm_version => $1 }];
323             }
324              
325 0 0         return [] unless ($self->{crlf});
326 0           my $crlf = $self->{crlf};
327              
328             # collect lines in buffer until we find a double line
329 0 0         return [] unless($self->{buffer} =~ m/${crlf}${crlf}/s);
330              
331              
332 0           $self->{buffer} =~ s/(^.*?)(${crlf}${crlf})//s;
333              
334 0           my $buf = "$1${crlf}";
335            
336 0           my $kv = {};
337              
338 0           foreach my $line (split(/(:?${crlf})/,$buf)) {
339 0           my $tmp = $line;
340 0           $tmp =~ s/\r|\n//g;
341 0 0         next unless($tmp);
342              
343 0 0         if ($line =~ m/([\w\-]+)\s*:\s+(.*)/) {
344 0           my $key = $1;
345 0           my $val = $2;
346 0 0         DEBUG && print "recv key $key: $val\n";
347              
348 0 0         if ($key eq 'Variable',) {
349 0           for my $v( split /\r/, $val ) {
350 0           $v =~ s/^Variable\:\s*//;
351 0           my @parts = split /=/, $v;
352 0           $kv->{$key}{$parts[0]} = $parts[1];
353 0 0         DEBUG && print " recv variable: $parts[0] => $parts[1]\n";
354             }
355             } else {
356 0           $kv->{$key} = $val;
357             }
358             } else {
359 0           $kv->{content} .= "$line";
360             }
361             }
362              
363 0 0         return (keys %$kv) ? [$kv] : [];
364             }
365              
366             sub put {
367 0     0     my ($self, $hrefs) = @_;
368 0           my @raw;
369 0           for my $i ( 0 .. $#{$hrefs} ) {
  0            
370 0 0         if (ref($hrefs->[$i]) eq 'HASH') {
    0          
    0          
371 0           foreach my $k (keys %{$hrefs->[$i]}) {
  0            
372 0 0         DEBUG && print "send key $k: $hrefs->[$i]{$k}\n";
373 0           push(@raw,"$k: $hrefs->[$i]{$k}$self->{crlf}");
374             }
375             } elsif (ref($hrefs->[$i]) eq 'ARRAY') {
376 0           push(@raw, join("$self->{crlf}", @{$hrefs->[$i]}, ""));
  0            
377             } elsif (ref($hrefs->[$i]) eq 'SCALAR') {
378 0           push(@raw, $hrefs->[$i]);
379             } else {
380 0           croak "unknown type ".ref($hrefs->[$i])." passed to ".__PACKAGE__."->put()";
381             }
382 0           push(@raw,"$self->{crlf}");
383             }
384 0           \@raw;
385             }
386              
387             sub get_pending {
388 0     0     my $self = shift;
389 0 0         return [ $self->{buffer} ] if length $self->{buffer};
390 0           return undef;
391             }
392              
393             1;
394              
395             __END__