File Coverage

blib/lib/Perlbal/AIO.pm
Criterion Covered Total %
statement 63 138 45.6
branch 16 44 36.3
condition 1 23 4.3
subroutine 21 31 67.7
pod 0 16 0.0
total 101 252 40.0


line stmt bran cond sub pod time code
1             # AIO abstraction layer
2             #
3             # Copyright 2004, Danga Interactive, Inc.
4             # Copyright 2005-2007, Six Apart, Ltd.
5              
6             package Perlbal::AIO;
7              
8 22     22   136 use strict;
  22         51  
  22         1129  
9 22     22   124 use POSIX qw(ENOENT EACCES EBADF);
  22         39  
  22         141  
10 22     22   2204 use Fcntl qw(SEEK_CUR SEEK_SET SEEK_END O_RDWR O_CREAT O_TRUNC);
  22         55  
  22         2000  
11              
12             # Try and use IO::AIO, if it's around.
13             BEGIN {
14 22     22   2692 $Perlbal::OPTMOD_IO_AIO = eval "use IO::AIO 1.6 (); 1;";
  22     22   61167  
  0         0  
  0         0  
15             }
16              
17             END {
18 22 50   22   14813260 IO::AIO::max_parallel(0)
19             if $Perlbal::OPTMOD_IO_AIO;
20             }
21              
22             $Perlbal::AIO_MODE = "none";
23             $Perlbal::AIO_MODE = "ioaio" if $Perlbal::OPTMOD_IO_AIO;
24              
25             ############################################################################
26             # AIO functions available to callers
27             ############################################################################
28              
29             sub aio_rename {
30 1     1 0 4 my ($srcpath, $dstpath, $user_cb) = @_;
31             aio_channel_push(get_chan($srcpath), $user_cb, sub {
32 1     1   2 my $cb = shift;
33              
34 1 50       5 if ($Perlbal::AIO_MODE eq "ioaio") {
35 0         0 IO::AIO::aio_rename($srcpath, $dstpath, $cb);
36             } else {
37 1         139 my $rv = rename($srcpath, $dstpath);
38 1 50       6 $rv = $rv ? 0 : -1;
39 1         5 $cb->($rv);
40             }
41 1         3 });
42             }
43              
44             sub aio_readahead {
45 39     39 0 77 my ($fh, $offset, $length, $user_cb) = @_;
46              
47             aio_channel_push(get_chan(), $user_cb, sub {
48 39     39   67 my $cb = shift;
49             # $fh could end up closed.
50 39 50 33     158 if ($Perlbal::AIO_MODE eq "ioaio" && defined fileno($fh)) {
51 0         0 IO::AIO::aio_readahead($fh, $offset, $length, $cb);
52             } else {
53 39         110 $cb->();
54             }
55 39         99 });
56             }
57              
58             sub aio_stat {
59 72     72 0 153 my ($file, $user_cb) = @_;
60              
61             aio_channel_push(get_chan($file), $user_cb, sub {
62 72     72   110 my $cb = shift;
63 72 50       202 if ($Perlbal::AIO_MODE eq "ioaio") {
64 0         0 IO::AIO::aio_stat($file, $cb);
65             } else {
66 72         3372 stat($file);
67 72         887 $cb->();
68             }
69 72         242 });
70             }
71              
72             sub aio_open {
73 83     83 0 220 my ($file, $flags, $mode, $user_cb) = @_;
74              
75             aio_channel_push(get_chan($file), $user_cb, sub {
76 83     83   143 my $cb = shift;
77              
78 83 50       395 if ($Perlbal::AIO_MODE eq "ioaio") {
79 0         0 IO::AIO::aio_open($file, $flags, $mode, $cb);
80             } else {
81 83         147 my $fh;
82 83         67960 my $rv = sysopen($fh, $file, $flags, $mode);
83 83 100       579 $cb->($rv ? $fh : undef);
84             }
85 83         285 });
86             }
87              
88             sub aio_unlink {
89 32     32 0 92 my ($file, $user_cb) = @_;
90             aio_channel_push(get_chan($file), $user_cb, sub {
91 32     32   77 my $cb = shift;
92              
93 32 50       112 if ($Perlbal::AIO_MODE eq "ioaio") {
94 0         0 IO::AIO::aio_unlink($file, $cb);
95             } else {
96 32         29564 my $rv = unlink($file);
97 32 100       153 $rv = $rv ? 0 : -1;
98 32         177 $cb->($rv);
99             }
100 32         119 });
101             }
102              
103             sub aio_write {
104             # 0 1 2 3(data) 4
105 85     85 0 213 my ($fh, $offset, $length, undef, $user_cb) = @_;
106 85 50       226 return no_fh($user_cb) unless $fh;
107 85         191 my $alist = \@_;
108              
109             aio_channel_push(get_chan(), $user_cb, sub {
110 85     85   166 my $cb = shift;
111 85 50       247 if ($Perlbal::AIO_MODE eq "ioaio") {
112 0         0 IO::AIO::aio_write($fh, $offset, $length, $alist->[3], 0, $cb);
113             } else {
114 85         25581 my $old_off = sysseek($fh, 0, SEEK_CUR);
115 85         359 sysseek($fh, $offset, 0);
116 85         25958 my $rv = syswrite($fh, $alist->[3], $length, 0);
117 85         564 sysseek($fh, $old_off, SEEK_SET);
118 85         444 $cb->($rv);
119             }
120 85         512 });
121             }
122              
123             sub aio_read {
124             # 0 1 2 3(data) 4
125 0     0 0 0 my ($fh, $offset, $length, undef, $user_cb) = @_;
126 0 0       0 return no_fh($user_cb) unless $fh;
127 0         0 my $alist = \@_;
128              
129             aio_channel_push(get_chan(), $user_cb, sub {
130 0     0   0 my $cb = shift;
131 0 0       0 if ($Perlbal::AIO_MODE eq "ioaio") {
132 0         0 IO::AIO::aio_read($fh, $offset, $length, $alist->[3], 0, $cb);
133             } else {
134 0         0 my $old_off = sysseek($fh, 0, SEEK_CUR);
135 0         0 sysseek($fh, $offset, 0);
136 0         0 my $rv = sysread($fh, $alist->[3], $length, 0);
137 0         0 sysseek($fh, $old_off, SEEK_SET);
138 0         0 $cb->($rv);
139             }
140 0         0 });
141             }
142              
143             ############################################################################
144             # AIO channel stuff
145             # prevents all AIO threads from being consumed by requests for same
146             # failing/overloaded disk by isolating them into separate 'channels' in
147             # parent process and not dispatching more than the max in-flight count
148             # allows. think of a channel as a named queue. or in reality, a disk.
149             ############################################################################
150              
151             my %chan_outstanding; # $channel_name -> $num_in_flight
152             my %chan_pending; # $channel_name -> [ [$subref, $cb], .... ]
153             my %chan_hitmaxdepth; # $channel_name -> $times_enqueued (not dispatched immediately)
154             my %chan_submitct; # $channel_name -> $times_submitted (total AIO requests for this channel)
155             my $use_aio_chans = 0; # keep them off for now, until mogstored code is ready to use them
156             my $file_to_chan_hook; # coderef that returns $chan_name given a $filename
157              
158             my %chan_concurrency; # $channel_name -> concurrency per channel
159             # (cache. definitive version via function call)
160              
161             sub get_aio_stats {
162 0     0 0 0 my $ret = {};
163 0         0 foreach my $c (keys %chan_outstanding) {
164 0   0     0 $ret->{$c} = {
165             cur_running => $chan_outstanding{$c},
166             ctr_queued => $chan_hitmaxdepth{$c} || 0,
167             ctr_total => $chan_submitct{$c},
168             };
169             }
170              
171 0         0 foreach my $c (keys %chan_pending) {
172 0   0     0 my $rec = $ret->{$c} ||= {};
173 0         0 $rec->{cur_queued} = scalar @{$chan_pending{$c}};
  0         0  
174             }
175              
176 0         0 return $ret;
177             }
178              
179             # (external API). set trans hook, but also enables AIO channels.
180             sub set_file_to_chan_hook {
181 0     0 0 0 $file_to_chan_hook = shift; # coderef that returns $chan_name given a $filename
182 0         0 $use_aio_chans = 1;
183             }
184              
185             # internal API:
186             sub aio_channel_push {
187 312     312 0 542 my ($chan, $user_cb, $action) = @_;
188              
189             # if we were to do it immediately, bypassing AIO channels (future option?)
190 312 50       736 unless ($use_aio_chans) {
191 312         674 $action->($user_cb);
192 312         3485 return;
193             }
194              
195             # IO::AIO/etc only take one callback. so we wrap the user
196             # (caller) function with our own that first calls theirs, then
197             # does our bookkeeping and queue management afterwards.
198             my $wrapped_cb = sub {
199 0     0   0 $user_cb->(@_);
200 0         0 $chan_outstanding{$chan}--;
201 0         0 aio_channel_cond_run($chan);
202 0         0 };
203              
204             # in case this is the first time this queue has been used, init stuff:
205 0   0     0 my $chanpend = ($chan_pending{$chan} ||= []);
206 0   0     0 $chan_outstanding{$chan} ||= 0;
207 0         0 $chan_submitct{$chan}++;
208              
209 0   0     0 my $max_out = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);
210              
211 0 0       0 if ($chan_outstanding{$chan} < $max_out) {
212 0         0 $chan_outstanding{$chan}++;
213 0         0 $action->($wrapped_cb);
214 0         0 return;
215             } else {
216             # too deep. enqueue.
217 0         0 $chan_hitmaxdepth{$chan}++;
218 0         0 push @$chanpend, [$action, $wrapped_cb];
219             }
220             }
221              
222             sub aio_chan_max_concurrent {
223 0     0 0 0 my ($chan) = @_;
224 0 0       0 return 100 if $chan eq '[default]';
225 0         0 return 10;
226             }
227              
228             sub aio_channel_cond_run {
229 0     0 0 0 my ($chan) = @_;
230              
231 0 0       0 my $chanpend = $chan_pending{$chan} or return;
232 0   0     0 my $max_out = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);
233              
234 0         0 my $job;
235 0   0     0 while ($chan_outstanding{$chan} < $max_out && ($job = shift @$chanpend)) {
236 0         0 $chan_outstanding{$chan}++;
237 0         0 $job->[0]->($job->[1]);
238             }
239             }
240              
241             my $next_chan;
242             sub set_channel {
243 0     0 0 0 $next_chan = shift;
244             }
245              
246             sub set_file_for_channel {
247 63     63 0 188 my ($file) = @_;
248 63 50       152 if ($file_to_chan_hook) {
249 0         0 $next_chan = $file_to_chan_hook->($file);
250             } else {
251 63         318 $next_chan = undef;
252             }
253             }
254              
255             # gets currently-set channel, then clears it. or if none set,
256             # lets registered hook set the channel name from the optional
257             # $file parameter. the default channel, '[default]' has no limits
258             sub get_chan {
259 312 50   312 0 2948 return undef unless $use_aio_chans;
260 0           my ($file) = @_;
261 0 0         set_file_for_channel($file) if $file;
262              
263 0 0         if (my $chan = $next_chan) {
264 0           $next_chan = undef;
265 0           return $chan;
266             }
267              
268 0           return "[default]";
269             }
270              
271             ############################################################################
272             # misc util functions
273             ############################################################################
274              
275             sub _fh_of_fd_mode {
276 0     0     my ($fd, $mode) = @_;
277 0 0 0       return undef unless defined $fd && $fd >= 0;
278              
279             #TODO: use the write MODE for the given $mode;
280 0           my $fh = IO::Handle->new_from_fd($fd, 'r+');
281 0           my $num = fileno($fh);
282 0           return $fh;
283             }
284              
285             sub no_fh {
286 0     0 0   my $cb = shift;
287              
288 0           my $i = 1;
289 0           my $stack_trace = "";
290 0           while (my ($pkg, $filename, $line, $subroutine, $hasargs,
291             $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller($i++)) {
292 0           $stack_trace .= " at $filename:$line $subroutine\n";
293             }
294              
295 0           Perlbal::log("crit", "Undef \$fh: $stack_trace");
296 0           $cb->(undef);
297 0           return undef;
298             }
299              
300             1;