File Coverage

blib/lib/Coro/Channel.pm
Criterion Covered Total %
statement 18 22 81.8
branch n/a
condition 1 2 50.0
subroutine 6 9 66.6
pod 4 5 80.0
total 29 38 76.3


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Coro::Channel - message queues
4              
5             =head1 SYNOPSIS
6              
7             use Coro;
8              
9             $q1 = new Coro::Channel ;
10              
11             $q1->put ("xxx");
12             print $q1->get;
13              
14             die unless $q1->size;
15              
16             =head1 DESCRIPTION
17              
18             A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19             message ports): you can put things into it on one end and read things out
20             of it from the other end. If the capacity of the Channel is maxed out
21             writers will block. Both ends of a Channel can be read/written from by as
22             many coroutines as you want concurrently.
23              
24             You don't have to load C manually, it will be loaded
25             automatically when you C and call the C constructor.
26              
27             =over 4
28              
29             =cut
30              
31             package Coro::Channel;
32              
33 3     3   1185 use common::sense;
  3         9  
  3         15  
34              
35 3     3   149 use Coro ();
  3         8  
  3         52  
36 3     3   626 use Coro::Semaphore ();
  3         5  
  3         1138  
37              
38             our $VERSION = 6.514;
39              
40             sub DATA (){ 0 }
41             sub SGET (){ 1 }
42             sub SPUT (){ 2 }
43              
44             =item $q = new Coro:Channel $maxsize
45              
46             Create a new channel with the given maximum size (practically unlimited
47             if C is omitted or zero). Giving a size of one gives you a
48             traditional channel, i.e. a queue that can store only a single element
49             (which means there will be no buffering, and C will wait until there
50             is a corresponding C call). To buffer one element you have to specify
51             C<2>, and so on.
52              
53             =cut
54              
55             sub new {
56             # we cheat and set infinity == 2*10**9
57 3   50 3   151 bless [
58             [], # initially empty
59             (Coro::Semaphore::_alloc 0), # counts data
60             (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
61             ]
62             }
63              
64             =item $q->put ($scalar)
65              
66             Put the given scalar into the queue.
67              
68             =cut
69              
70             sub put {
71 12     12 1 104 push @{$_[0][DATA]}, $_[1];
  12         32  
72 12         38 Coro::Semaphore::up $_[0][SGET];
73 12         65 Coro::Semaphore::down $_[0][SPUT];
74             }
75              
76             =item $q->get
77              
78             Return the next element from the queue, waiting if necessary.
79              
80             =cut
81              
82             sub get {
83 12     12 1 150 Coro::Semaphore::down $_[0][SGET];
84 12         51 Coro::Semaphore::up $_[0][SPUT];
85 12         20 shift @{$_[0][DATA]}
  12         33  
86             }
87              
88             =item $q->shutdown
89              
90             Shuts down the Channel by pushing a virtual end marker onto it: This
91             changes the behaviour of the Channel when it becomes or is empty to return
92             C, almost as if infinitely many C elements had been put
93             into the queue.
94              
95             Specifically, this function wakes up any pending C calls and lets
96             them return C, the same on future C calls. C will return
97             the real number of stored elements, though.
98              
99             Another way to describe the behaviour is that C calls will not block
100             when the queue becomes empty but immediately return C. This means
101             that calls to C will work normally and the data will be returned on
102             subsequent C calls.
103              
104             This method is useful to signal the end of data to any consumers, quite
105             similar to an end of stream on e.g. a tcp socket: You have one or more
106             producers that C data into the Channel and one or more consumers who
107             C them. When all producers have finished producing data, a call to
108             C signals this fact to any consumers.
109              
110             A common implementation uses one or more threads that C from
111             a channel until it returns C. To clean everything up, first
112             C the channel, then C the threads.
113              
114             =cut
115              
116             sub shutdown {
117 0     0 1   Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
118             }
119              
120             =item $q->size
121              
122             Return the number of elements waiting to be consumed. Please note that:
123              
124             if ($q->size) {
125             my $data = $q->get;
126             ...
127             }
128              
129             is I a race condition but instead works just fine. Note that the
130             number of elements that wait can be larger than C<$maxsize>, as it
131             includes any coroutines waiting to put data into the channel (but not any
132             shutdown condition).
133              
134             This means that the number returned is I the number of calls
135             to C that will succeed instantly and return some data. Calling
136             C has no effect on this number.
137              
138             =cut
139              
140             sub size {
141 0     0 1   scalar @{$_[0][DATA]}
  0            
142             }
143              
144             # this is not undocumented by accident - if it breaks, you
145             # get to keep the pieces
146             sub adjust {
147 0     0 0   Coro::Semaphore::adjust $_[0][SPUT], $_[1];
148             }
149              
150             1;
151              
152             =back
153              
154             =head1 AUTHOR/SUPPORT/CONTACT
155              
156             Marc A. Lehmann
157             http://software.schmorp.de/pkg/Coro.html
158              
159             =cut
160