File Coverage

blib/lib/Net/SIP/Simple/RTP.pm
Criterion Covered Total %
statement 176 217 81.1
branch 61 106 57.5
condition 33 61 54.1
subroutine 15 15 100.0
pod 2 2 100.0
total 287 401 71.5


line stmt bran cond sub pod time code
1             ###########################################################################
2             # Net::SIP::Simple::RTP
3             # implements some RTP behaviors
4             # - media_recv_echo: receive and echo data with optional delay back
5             # can save received data
6             # - media_send_recv: receive and optionally save data. Sends back data
7             # from file with optional repeat count
8             ###########################################################################
9              
10 43     43   240 use strict;
  43         70  
  43         1018  
11 43     43   174 use warnings;
  43         69  
  43         1349  
12              
13             package Net::SIP::Simple::RTP;
14              
15 43     43   216 use Net::SIP::Util qw(invoke_callback ip_sockaddr2parts ip_parts2string);
  43         96  
  43         2017  
16 43     43   267 use Socket;
  43         70  
  43         19227  
17 43     43   278 use Net::SIP::Debug;
  43         104  
  43         224  
18 43     43   16236 use Net::SIP::DTMF;
  43         171  
  43         2557  
19 43     43   281 use Net::SIP::Dispatcher::Eventloop;
  43         81  
  43         2320  
20              
21              
22             # on MSWin32 non-blocking sockets are not supported from IO::Socket
23 43     43   238 use constant CAN_NONBLOCKING => $^O ne 'MSWin32';
  43         70  
  43         119056  
24              
25             ###########################################################################
26             # creates function which will initialize Media for echo back
27             # Args: ($writeto,$delay)
28             # $delay: how much packets delay between receive and echo back (default 0)
29             # if <0 no ddata will be send back (e.g. recv only)
30             # $writeto: where to save received data (default: don't save)
31             # Returns: [ \&sub,@args ]
32             ###########################################################################
33             sub media_recv_echo {
34 57     57 1 197 my ($writeto,$delay) = @_;
35              
36             my $sub = sub {
37 32     32   89 my ($delay,$writeto,$call,$args) = @_;
38              
39 32         83 my $lsocks = $args->{media_lsocks};
40 32   33     402 my $ssocks = $args->{media_ssocks} || $lsocks;
41 32         94 my $raddr = $args->{media_raddr};
42 32         200 my $mdtmf = $args->{media_dtmfxtract};
43 32         62 my $didit = 0;
44 32         137 for( my $i=0;$i<@$lsocks;$i++ ) {
45 32   50     251 my $sock = $lsocks->[$i] || next;
46 32 50       1499 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
47 32         81 my $s_sock = $ssocks->[$i];
48 32 50       132 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
49              
50 32         82 my $addr = $raddr->[$i];
51 32 100       143 $addr = $addr->[0] if ref($addr);
52              
53 32         88 my @delay_buffer;
54 32         53 my $channel = $i;
55             my $echo_back = sub {
56 1498         7635 my ($s_sock,$remote,$delay_buffer,$delay,$writeto,$targs,$didit,$sock) = @_;
57             {
58 1498 100       3404 my ($buf,$mpt,$seq,$tstamp,$ssrc,$csrc) =
  3251         27826  
59             _receive_rtp($sock,$writeto,$targs,$didit,$channel)
60             or last;
61             #DEBUG( "$didit=$$didit" );
62 1903         7446 $$didit = 1;
63              
64 1903 100 66     11277 last if ! $s_sock || ! $remote; # call on hold ?
65              
66 1853         8578 my @pkt = _generate_dtmf($targs,$seq,$tstamp,0x1234);
67 1853 50 33     7511 if (@pkt && $pkt[0] ne '') {
68 0         0 DEBUG( 100,"send DTMF to RTP");
69 0         0 send( $s_sock,$_,0,$remote ) for(@pkt);
70 0         0 return; # send DTMF *instead* of echo data
71             }
72              
73 1853 100       6196 last if $delay<0;
74 1753         4497 push @$delay_buffer, $buf;
75 1753         17269 while ( @$delay_buffer > $delay ) {
76 1753         434596 send( $s_sock,shift(@$delay_buffer),0,$remote );
77             }
78 1753         17370 CAN_NONBLOCKING && redo; # try recv again
79             }
80 32         577 };
81              
82             $call->{loop}->addFD($sock, EV_READ,
83             [ $echo_back,$s_sock,$addr,\@delay_buffer,$delay || 0,$writeto,{
84             dtmf_gen => $args->{dtmf_events},
85             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
86 32   100     1071 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
      50        
87             },\$didit ],
88             'rtp_echo_back' );
89 32         222 my $reset_to_blocking = CAN_NONBLOCKING && $s_sock->blocking(0);
90 32         628 push @{ $call->{ rtp_cleanup }}, [ sub {
91 32         83 my ($call,$sock,$rb) = @_;
92 32         191 DEBUG( 100,"rtp_cleanup: remove socket %d",fileno($sock));
93 32         266 $call->{loop}->delFD( $sock );
94 32 50       262 $sock->blocking(1) if $rb;
95 32         598 }, $call,$sock,$reset_to_blocking ];
96             }
97              
98             # on RTP inactivity for at least 10 seconds close connection
99             my $timer = $call->{dispatcher}->add_timer( 10,
100             [ sub {
101 0         0 my ($call,$didit,$timer) = @_;
102 0 0       0 if ( $$didit ) {
103 0         0 $$didit = 0;
104             } else {
105 0         0 DEBUG(10, "closing call because if inactivity" );
106 0         0 $call->bye;
107 0         0 $timer->cancel;
108             }
109 32         938 }, $call,\$didit ],
110             10,
111             'rtp_inactivity',
112             );
113 32         827 push @{ $call->{ rtp_cleanup }}, [
114             sub {
115 32         182 shift->cancel;
116 32         120 DEBUG( 100,"cancel RTP timer" );
117             },
118 32         160 $timer
119             ];
120 57         1572 };
121              
122 57         473 return [ $sub,$delay,$writeto ];
123             }
124              
125             ###########################################################################
126             # creates function which will initialize Media for saving received data
127             # into file and sending data from another file
128             # Args: ($readfrom;$repeat,$writeto)
129             # $readfrom: where to read data for sending from (filename or callback
130             # which returns payload)
131             # $repeat: if <= 0 the data in $readfrom will be send again and again
132             # if >0 the data in $readfrom will be send $repeat times
133             # $writeto: where to save received data (undef == don't save), either
134             # filename or callback which gets packet as argument
135             # Returns: [ \&sub,@args ]
136             ###########################################################################
137             sub media_send_recv {
138 16     16 1 50 my ($readfrom,$repeat,$writeto) = @_;
139              
140             my $sub = sub {
141 16     16   71 my ($writeto,$readfrom,$repeat,$call,$args) = @_;
142              
143 16         38 my $lsocks = $args->{media_lsocks};
144 16   33     122 my $ssocks = $args->{media_ssocks} || $lsocks;
145 16         40 my $raddr = $args->{media_raddr};
146 16         31 my $mdtmf = $args->{media_dtmfxtract};
147 16         33 my $didit = 0;
148 16         87 for( my $i=0;$i<@$lsocks;$i++ ) {
149 16         29 my $channel = $i;
150 16         40 my $sock = $lsocks->[$i];
151 16         34 my ($timer,$reset_to_blocking);
152              
153             # recv once I get an event on RTP socket
154 16 50       47 if ($sock) {
155 16 50       78 $sock = $sock->[0] if UNIVERSAL::isa( $sock,'ARRAY' );
156             my $receive = sub {
157 2913         9782 my ($writeto,$targs,$didit,$sock) = @_;
158 2913         10170 while (1) {
159 6372         19354 my $buf = _receive_rtp($sock,$writeto,$targs,$didit,$channel);
160 6372 100       32710 defined($buf) or return;
161 3459         6102 CAN_NONBLOCKING or return;
162             }
163 16         231 };
164             $call->{loop}->addFD($sock, EV_READ,
165             [
166             $receive,
167             $writeto,
168             {
169             dtmf_gen => $args->{dtmf_events},
170             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
171 16   50     532 && [ $mdtmf->[$i], $args->{cb_dtmf} ],
172             },
173             \$didit
174             ],
175             'rtp_receive'
176             );
177 16         138 $reset_to_blocking = CAN_NONBLOCKING && $sock->blocking(0);
178             }
179              
180             # sending need to be done with a timer
181             # ! $addr == call on hold
182 16         354 my $addr = $raddr->[$i];
183 16 50       97 $addr = $addr->[0] if ref($addr);
184 16 50 33     227 if ($addr and my $s_sock = $ssocks->[$i]) {
185 16 50       767 $s_sock = $s_sock->[0] if UNIVERSAL::isa( $s_sock,'ARRAY' );
186 16   100     130 my $cb_done = $args->{cb_rtp_done} || sub { shift->bye };
187             $timer = $call->{dispatcher}->add_timer(
188             0, # start immediately
189             [ \&_send_rtp,$s_sock,$call->{loop},$addr,$readfrom,$channel, {
190             repeat => $repeat || 1,
191 12         60 cb_done => [ sub { invoke_callback(@_) }, $cb_done, $call ],
192             rtp_param => $args->{rtp_param},
193             dtmf_gen => $args->{dtmf_events},
194             dtmf_xtract => $mdtmf && $mdtmf->[$i] && $args->{cb_dtmf}
195             && [ $mdtmf->[$i], $args->{cb_dtmf} ],
196             }],
197 16   100     665 $args->{rtp_param}[2], # repeat timer
      50        
198             'rtpsend',
199             );
200             }
201              
202 16         264 push @{ $call->{rtp_cleanup}}, [ sub {
203 16         46 my ($call,$sock,$timer,$rb) = @_;
204 16 50       56 if ($sock) {
205 16         451 $call->{loop}->delFD($sock);
206 16 50       123 $sock->blocking(1) if $rb;
207             }
208 16 50       373 $timer->cancel() if $timer;
209 16         29 }, $call,$sock,$timer,$reset_to_blocking ];
210             }
211              
212             # on RTP inactivity for at least 10 seconds close connection
213             my $timer = $call->{dispatcher}->add_timer( 10,
214             [ sub {
215 0         0 my ($call,$args,$didit,$timer) = @_;
216 0 0       0 if ( $$didit ) {
217 0         0 $$didit = 0;
218             } else {
219 0         0 DEBUG( 10,"closing call because if inactivity" );
220 0         0 $call->bye;
221 0         0 $timer->cancel;
222             }
223 16         278 }, $call,$args,\$didit ],
224             10,
225             'rtp_inactivity',
226             );
227 16         33 push @{ $call->{ rtp_cleanup }}, [ sub { shift->cancel }, $timer ];
  16         199  
  16         238  
228 16         500 };
229              
230 16         299 return [ $sub,$writeto,$readfrom,$repeat ];
231             }
232              
233             ###########################################################################
234             # Helper to receive RTP and optionally save it to file
235             # Args: ($sock,$writeto,$targs,$didit,$channel)
236             # $sock: RTP socket
237             # $writeto: filename for saving or callback which gets packet as argument
238             # $targs: \%hash to hold state info between calls of this function
239             # $didit: reference to scalar which gets set to TRUE on each received packet
240             # and which gets set to FALSE from a timer, thus detecting inactivity
241             # $channel: index of RTP channel
242             # Return: $packet
243             # $packet: received RTP packet (including header)
244             ###########################################################################
245             sub _receive_rtp {
246 9623     9623   34612 my ($sock,$writeto,$targs,$didit,$channel) = @_;
247              
248 9623         185584 my $from = recv( $sock,my $buf,2**16,0 );
249 9623 100 66     103877 return if ! $from || !defined($buf) || $buf eq '';
      66        
250 5362         38277 DEBUG( 50,"received %d bytes from RTP", length($buf));
251              
252 5362         10376 if(0) {
253             DEBUG( "got data on socket %d %s from %s",fileno($sock),
254             ip_sockaddr2string(getsockname($sock)),
255             ip_sockaddr2string($from));
256             }
257              
258 5362         14834 $$didit = 1;
259 5362         15802 my $packet = $buf;
260              
261 5362         44830 my ($vpxcc,$mpt,$seq,$tstamp,$ssrc) = unpack( 'CCnNN',substr( $buf,0,12,'' ));
262 5362         18812 my $version = ($vpxcc & 0xc0) >> 6;
263 5362 50       16848 if ( $version != 2 ) {
264 0         0 DEBUG( 100,"RTP version $version" );
265             return
266 0         0 }
267             # skip csrc headers
268 5362         13995 my $cc = $vpxcc & 0x0f;
269 5362   33     16647 my $csrc = $cc && substr( $buf,0,4*$cc,'' );
270              
271             # skip extension header
272 5362 50       19007 my $xh = $vpxcc & 0x10 ? (unpack( 'nn', substr( $buf,0,4,'' )))[1] : 0;
273 5362 50       18321 substr( $buf,0,4*$xh,'' ) if $xh;
274              
275             # ignore padding
276 5362 50       12204 my $padding = $vpxcc & 0x20 ? unpack( 'C', substr($buf,-1,1)) : 0;
277 5362 50       21319 my $payload = $padding ? substr( $buf,0,length($buf)-$padding ): $buf;
278              
279 5362         32358 DEBUG( 100,"ch=%d payload=%d/%d pt=%d xh=%d padding=%d cc=%d",
280             $channel, $seq, length($payload), $mpt & 0x7f, $xh, $padding, $cc);
281 5362 50 66     46279 if ( $targs->{rseq} && $seq<= $targs->{rseq}
      33        
282             && $targs->{rseq} - $seq < 60000 ) {
283 0         0 DEBUG( 10,"seq=$seq last=$targs->{rseq} - dropped" );
284 0         0 return;
285             }
286 5362         12916 $targs->{rseq} = $seq;
287              
288 5362 100       21597 if ( ref($writeto)) {
    50          
289             # callback
290 1952         13202 invoke_callback($writeto,$payload,$seq,$tstamp,$channel,$mpt & 0x7f);
291             } elsif ( $writeto ) {
292             # save into file
293 0         0 my $fd = $targs->{fdr};
294 0 0       0 if ( !$fd ) {
295 0 0       0 open( $fd,'>',$writeto ) || die $!;
296 0         0 $targs->{fdr} = $fd
297             }
298 0         0 syswrite($fd,$payload);
299             }
300              
301 5362 100       52008 if ( my $xt = $targs->{dtmf_xtract} ) {
302 4213         20633 my ($sub,$cb) = @$xt;
303 4213 100       21383 if ( my ($event,$duration) = $sub->($packet)) {
304 72         450 DEBUG(40,"received dtmf <$event,$duration>");
305 72         311 $cb->($event,$duration);
306             }
307             }
308              
309 5362 100       35145 return wantarray ? ( $packet,$mpt,$seq,$tstamp,$ssrc,$csrc ): $packet;
310             }
311              
312             ###########################################################################
313             # Helper to read RTP data from file (PCMU 8000) and send them through
314             # the RTP socket
315             # Args: ($sock,$loop,$addr,$readfrom,$targs,$timer)
316             # $sock: RTP socket
317             # $loop: event loop (used for looptime for timestamp)
318             # $addr: where to send data
319             # $readfrom: filename for reading or callback which will return payload
320             # $channel: index of RTP channel
321             # $targs: \%hash to hold state info between calls of this function
322             # especially 'repeat' holds the number of times this data has to be
323             # send (<=0 means forever) and 'cb_done' holds a [\&sub,@arg] callback
324             # to end the call after sending all data
325             # 'repeat' makes only sense if $readfrom is filename
326             # $timer: timer which gets canceled once all data are send
327             # Return: NONE
328             ###########################################################################
329             sub _send_rtp {
330 3622     3622   12804 my ($sock,$loop,$addr,$readfrom,$channel,$targs,$timer) = @_;
331              
332 3622         28222 $targs->{wseq}++;
333 3622         8851 my $seq = $targs->{wseq};
334              
335             # 32 bit timestamp based on seq and packet size
336 3622         16003 my $timestamp = ( $targs->{rtp_param}[1] * $seq ) % 2**32;
337              
338 3622         14731 my @pkt = _generate_dtmf($targs,$seq,$timestamp,0x1234);
339 3622 100 100     23504 if (@pkt && $pkt[0] ne '') {
340 1310         5185 DEBUG( 100,"send DTMF to RTP");
341 1310         693073 send( $sock,$_,0,$addr ) for(@pkt);
342 1310         9968 return;
343             }
344              
345 2312         7302 my $buf;
346             my $rtp_event;
347 2312         0 my $payload_type;
348              
349 2312 50       24947 if ( ref($readfrom) ) {
350             # payload by callback
351 2312         8591 $buf = invoke_callback($readfrom,$seq,$channel);
352 2312 100       56230 if ( !$buf ) {
353 12         55 DEBUG( 50, "no more data from callback" );
354 12 50       88 $timer && $timer->cancel;
355 12         43 invoke_callback( $targs->{cb_done} );
356 12         91 return;
357             }
358 2300 50       9061 ($buf,$payload_type,$rtp_event,$timestamp) = @$buf if ref($buf);
359             } else {
360             # read from file
361 0         0 for(my $tries = 0; $tries<2;$tries++ ) {
362 0   0     0 $targs->{wseq} ||= int( rand( 2**16 ));
363 0         0 my $fd = $targs->{fd};
364 0 0       0 if ( !$fd ) {
365 0 0       0 $targs->{repeat} = -1 if $targs->{repeat} < 0;
366 0 0       0 if ( $targs->{repeat} == 0 ) {
367             # no more sending
368 0         0 DEBUG( 50, "no more data from file" );
369 0 0       0 $timer && $timer->cancel;
370 0         0 invoke_callback( $targs->{cb_done} );
371 0         0 return;
372             }
373              
374 0 0       0 open( $fd,'<',$readfrom ) || die $!;
375 0         0 $targs->{fd} = $fd;
376             }
377 0         0 my $size = $targs->{rtp_param}[1]; # 160 for PCMU/8000
378 0 0       0 last if read( $fd,$buf,$size ) == $size;
379             # try to reopen file
380 0         0 close($fd);
381 0         0 $targs->{fd} = undef;
382 0         0 $targs->{repeat}--;
383             }
384             }
385              
386 2300 50 33     11672 die $! if ! defined $buf or $buf eq '';
387 2300         16371 if (0) {
388             DEBUG(50, "%s -> %s seq=%d ts=%x",
389             ip_sockaddr2string(getsockname($sock)),
390             ip_sockaddr2string($addr),
391             $seq, $timestamp
392             );
393             }
394              
395             # add RTP header
396 2300 50       8783 $rtp_event = 0 if ! defined $rtp_event;
397 2300 50 50     13793 $payload_type = $targs->{rtp_param}[0]||0 # 0 == PMCU 8000
398             if ! defined $payload_type;
399              
400 2300         16659 my $header = pack('CCnNN',
401             0b10000000, # Version 2
402             $payload_type | ( $rtp_event << 7 ) ,
403             $seq, # sequence
404             $timestamp,
405             0x1234, # source ID
406             );
407 2300         13705 DEBUG( 100,"send %d bytes to RTP", length($buf));
408 2300         755498 send( $sock,$header.$buf,0,$addr );
409             }
410              
411             ###########################################################################
412             # Helper to send DTMF
413             # Args: ($targs,$seq,$timestamp,$srcid)
414             # $targs: hash which is shared with _send_rtp and other callbacks, contains
415             # dtmf array with events
416             # $seq,$timestamp,$srcid: parameter for RTP packet
417             # Returns: @pkt
418             # (): no DTMF events to handle
419             # $pkt[0] eq '': DTMF in process, but no data
420             # @pkt: RTP packets to send
421             ###########################################################################
422             sub _generate_dtmf {
423 5475     5475   16802 my ($targs,$seq,$timestamp,$srcid) = @_;
424 5475         11722 my $dtmfs = $targs->{dtmf_gen};
425 5475 100 66     34607 $dtmfs and @$dtmfs or return;
426              
427 1418         5190 while ( @$dtmfs ) {
428 1502         3332 my $dtmf = $dtmfs->[0];
429 1502 100       5168 if ( my $duration = $dtmf->{duration} ) {
430             DEBUG(40,"generate dtmf ".(
431             $dtmf->{sub} ? '' :
432 1490 100       25297 defined $dtmf->{event} ? "<$dtmf->{event},$duration>" :
    100          
433             ""
434             ));
435             my $cb = $dtmf->{sub}
436 1490   66     17332 ||= dtmf_generator($dtmf->{event},$duration,%$dtmf);
437 1490         7890 my @pkt = $cb->($seq,$timestamp,$srcid);
438 1490 100       7070 return @pkt if @pkt;
439             }
440 96         266 shift(@$dtmfs);
441 96 100       1782 if ( my $cb = $dtmf->{cb_final} ) {
442 12         49 invoke_callback($cb,'OK');
443             }
444             }
445 12         41 return;
446             }
447              
448             1;