File Coverage

blib/lib/IPC/Simple/Channel.pm
Criterion Covered Total %
statement 44 51 86.2
branch 2 2 100.0
condition 3 3 100.0
subroutine 10 13 76.9
pod 0 9 0.0
total 59 78 75.6


line stmt bran cond sub pod time code
1             package IPC::Simple::Channel;
2             $IPC::Simple::Channel::VERSION = '0.07';
3 4     4   23 use strict;
  4         7  
  4         116  
4 4     4   18 use warnings;
  4         6  
  4         85  
5              
6 4     4   16 use AnyEvent qw();
  4         5  
  4         1480  
7              
8             sub new {
9 6     6 0 33 my ($class) = @_;
10              
11 6         88 bless{
12             waiters => [],
13             buffer => [],
14             is_shutdown => 0,
15             }, $class;
16             }
17              
18             sub DESTROY {
19 0     0   0 my $self = shift;
20 0         0 $self->shutdown;
21             }
22              
23             sub shutdown {
24 4     4 0 10 my $self = shift;
25              
26 4         8 $self->{is_shutdown} = 1;
27              
28             # flush any remaining messages that have pending receivers
29 4         22 $self->flush;
30              
31             # send undef to any remaining receivers
32 4         13 $_->send for @{ $self->{waiters} };
  4         18  
33             }
34              
35             sub size {
36 0     0 0 0 my $self = shift;
37 0         0 return scalar @{ $self->{buffer} };
  0         0  
38             }
39              
40             sub put {
41 6     6 0 9 my $self = shift;
42 6         6 push @{ $self->{buffer} }, @_;
  6         13  
43 6         14 $self->flush;
44 6         13 return $self->{size};
45             }
46              
47             sub get {
48 2     2 0 3 my $self = shift;
49 2         9 $self->async->recv;
50             }
51              
52             sub recv {
53 5     5 0 7 my $self = shift;
54 5         12 $self->async->recv;
55             }
56              
57             sub next {
58 0     0 0 0 my $self = shift;
59 0         0 $self->async->recv;
60             }
61              
62             sub async {
63 7     7 0 16 my $self = shift;
64 7         155 my $cv = AnyEvent->condvar;
65              
66 7 100       598 if ($self->{is_shutdown}) {
67 1         2 my $msg = shift @{ $self->{buffer} };
  1         2  
68 1         4 $cv->send($msg);
69 1         9 return $cv;
70             }
71             else {
72 6         11 push @{ $self->{waiters} }, $cv;
  6         23  
73 6         78 $self->flush;
74 6         44 return $cv;
75             }
76             }
77              
78             sub flush {
79 16     16 0 25 my $self = shift;
80 16   100     30 while (@{ $self->{waiters} } && @{ $self->{buffer} }) {
  22         134  
  10         41  
81 6         17 my $cv = shift @{ $self->{waiters} };
  6         14  
82 6         8 $cv->send( shift @{ $self->{buffer} } );
  6         35  
83             }
84             }
85              
86             1;
87              
88             __END__