File Coverage

blib/lib/Coro/Util.pm
Criterion Covered Total %
statement 30 82 36.5
branch 0 24 0.0
condition n/a
subroutine 10 17 58.8
pod 4 4 100.0
total 44 127 34.6


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Coro::Util - various utility functions.
4              
5             =head1 SYNOPSIS
6              
7             use Coro::Util;
8              
9             =head1 DESCRIPTION
10              
11             This module implements various utility functions, mostly replacing perl
12             functions by non-blocking counterparts.
13              
14             Many of these functions exist for the sole purpose of emulating existing
15             interfaces, no matter how bad or limited they are (e.g. no IPv6 support).
16              
17             This module is an AnyEvent user. Refer to the L
18             documentation to see how to integrate it into your own programs.
19              
20             =over 4
21              
22             =cut
23              
24             package Coro::Util;
25              
26 1     1   521 use common::sense;
  1         2  
  1         5  
27              
28 1     1   551 use Socket ();
  1         2591  
  1         22  
29              
30 1     1   6 use AnyEvent ();
  1         2  
  1         11  
31 1     1   430 use AnyEvent::Socket ();
  1         13764  
  1         28  
32              
33 1     1   6 use Coro::State;
  1         3  
  1         31  
34 1     1   665 use Coro::Handle;
  1         3  
  1         42  
35 1     1   321 use Coro::Storable ();
  1         4  
  1         19  
36 1     1   5 use Coro::AnyEvent ();
  1         2  
  1         13  
37 1     1   4 use Coro::Semaphore;
  1         1  
  1         18  
38              
39 1     1   4 use base 'Exporter';
  1         2  
  1         736  
40              
41             our @EXPORT = qw(gethostbyname gethostbyaddr);
42             our @EXPORT_OK = qw(inet_aton fork_eval);
43              
44             our $VERSION = 6.513;
45              
46             our $MAXPARALLEL = 16; # max. number of parallel jobs
47              
48             my $jobs = new Coro::Semaphore $MAXPARALLEL;
49              
50             sub _do_asy(&;@) {
51 0     0     my $sub = shift;
52 0           $jobs->down;
53 0           my $fh;
54              
55 0           my $pid = open $fh, "-|";
56              
57 0 0         if (!defined $pid) {
    0          
58 0           die "fork: $!";
59             } elsif (!$pid) {
60 0           syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
  0            
61 0           Coro::Util::_exit 0;
62             }
63              
64 0           my $buf;
65 0           my $wakeup = Coro::rouse_cb;
66 0           my $w; $w = AE::io $fh, 0, sub {
67 0 0   0     sysread $fh, $buf, 16384, length $buf
68             and return;
69              
70 0           undef $w;
71 0           $wakeup->();
72 0           };
73              
74 0           Coro::rouse_wait;
75              
76 0           $jobs->up;
77 0           my @r = map { pack "H*", $_ } split /\0/, $buf;
  0            
78 0 0         wantarray ? @r : $r[0];
79             }
80              
81             =item $ipn = Coro::Util::inet_aton $hostname || $ip
82              
83             Works almost exactly like its C counterpart, except
84             that it does not block other coroutines.
85              
86             Does not handle multihomed hosts or IPv6 - consider using
87             C with the L rouse functions
88             instead.
89              
90             =cut
91              
92             sub inet_aton {
93 0     0 1   AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
94 0           (grep length == 4, Coro::rouse_wait)[0]
95             }
96              
97             =item gethostbyname, gethostbyaddr
98              
99             Work similarly to their Perl counterparts, but do not block. Uses
100             C internally.
101              
102             Does not handle multihomed hosts or IPv6 - consider using
103             C or C
104             with the L rouse functions instead.
105              
106             =cut
107              
108             sub gethostbyname($) {
109 0     0 1   AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
110              
111 0           ($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
112             }
113              
114             sub gethostbyaddr($$) {
115 0     0     _do_asy { gethostbyaddr $_[0], $_[1] } @_
116 0     0 1   }
117              
118             =item @result = Coro::Util::fork_eval { ... }, @args
119              
120             Executes the given code block or code reference with the given arguments
121             in a separate process, returning the results. The return values must be
122             serialisable with Coro::Storable. It may, of course, block.
123              
124             Note that using event handling in the sub is not usually a good idea as
125             you will inherit a mixed set of watchers from the parent.
126              
127             Exceptions will be correctly forwarded to the caller.
128              
129             This function is useful for pushing cpu-intensive computations into a
130             different process, for example to take advantage of multiple CPU's. Its
131             also useful if you want to simply run some blocking functions (such as
132             C) and do not care about the overhead enough to code your own
133             pid watcher etc.
134              
135             This function might keep a pool of processes in some future version, as
136             fork can be rather slow in large processes.
137              
138             You should also look at C, which is newer and
139             more compatible to totally broken Perl implementations such as the one
140             from ActiveState.
141              
142             Example: execute some external program (convert image to rgba raw form)
143             and add a long computation (extract the alpha channel) in a separate
144             process, making sure that never more then $NUMCPUS processes are being
145             run.
146              
147             my $cpulock = new Coro::Semaphore $NUMCPUS;
148              
149             sub do_it {
150             my ($path) = @_;
151              
152             my $guard = $cpulock->guard;
153              
154             Coro::Util::fork_eval {
155             open my $fh, "convert -depth 8 \Q$path\E rgba:"
156             or die "$path: $!";
157              
158             local $/;
159             # make my eyes hurt
160             pack "C*", unpack "(xxxC)*", <$fh>
161             }
162             }
163              
164             my $alphachannel = do_it "/tmp/img.png";
165              
166             =cut
167              
168             sub fork_eval(&@) {
169 0     0 1   my ($cb, @args) = @_;
170              
171 0 0         pipe my $fh1, my $fh2
172             or die "pipe: $!";
173              
174 0           my $pid = fork;
175              
176 0 0         if ($pid) {
    0          
177 0           undef $fh2;
178              
179 0           my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
180 0           waitpid $pid, 0; # should not block, we expect the child to simply behave
181              
182 0 0         die $$res unless "ARRAY" eq ref $res;
183              
184 0 0         return wantarray ? @$res : $res->[-1];
185              
186             } elsif (defined $pid) {
187 0           delete $SIG{__WARN__};
188 0           delete $SIG{__DIE__};
189             # just in case, this hack effectively disables event processing
190             # in the child. cleaner and slower would be to canceling all
191             # event watchers, but we are event-model agnostic.
192 0           undef $Coro::idle;
193 0           $Coro::current->prio (Coro::PRIO_MAX);
194              
195 0           eval {
196 0           undef $fh1;
197              
198 0           my @res = eval { $cb->(@args) };
  0            
199              
200 0 0         open my $fh, ">", \my $buf
201             or die "fork_eval: cannot open fh-to-buf in child: $!";
202 0 0         Storable::store_fd $@ ? \"$@" : \@res, $fh;
203 0           close $fh;
204              
205 0           syswrite $fh2, $buf;
206 0           close $fh2;
207             };
208              
209 0 0         warn $@ if $@;
210 0           Coro::Util::_exit 0;
211              
212             } else {
213 0           die "fork_eval: $!";
214             }
215             }
216              
217             # make sure store_fd is preloaded
218             eval { Storable::store_fd undef, undef };
219              
220             1;
221              
222             =back
223              
224             =head1 AUTHOR/SUPPORT/CONTACT
225              
226             Marc A. Lehmann
227             http://software.schmorp.de/pkg/Coro.html
228              
229             =cut
230