File Coverage

lib/Sort/External.pm
Criterion Covered Total %
statement 140 140 100.0
branch 39 46 84.7
condition 7 9 77.7
subroutine 15 15 100.0
pod 2 2 100.0
total 203 212 95.7


line stmt bran cond sub pod time code
1             package Sort::External;
2 9     9   399038 use strict;
  9         24  
  9         797  
3 9     9   59 use warnings;
  9         21  
  9         310  
4              
5 9     9   257 use 5.006_001;
  9         35  
  9         605  
6              
7             our $VERSION = '0.18';
8              
9 9     9   49 use XSLoader;
  9         15  
  9         354  
10             XSLoader::load( 'Sort::External', $VERSION );
11              
12 9     9   113000 use File::Temp;
  9         455699  
  9         910  
13 9     9   80 use Fcntl qw( :DEFAULT );
  9         23  
  9         3945  
14 9     9   54 use Carp;
  9         17  
  9         35031  
15              
16             our %instance_vars = (
17             sortsub => undef,
18             working_dir => undef,
19             cache_size => 0,
20             mem_threshold => 1024**2 * 8,
21             line_separator => undef, # no-op, backwards compatibility only
22             );
23              
24             # Maximum number of runs that SortEx object is allowed without triggering
25             # multi-level consolidation at finish().
26             our $MAX_RUNS = 30;
27              
28             sub new {
29 27     27 1 44087 my $class = shift;
30 27   33     197 $class = ref($class) || $class;
31              
32             # Process labeled params.
33 27         209 my %args = %instance_vars;
34 27         118 while (@_) {
35 32         121 my ( $var, $val ) = ( shift, shift );
36             # Back compat: labeled params used to start with a dash.
37 32         92 $var =~ s/^-//;
38 32 50       142 croak("Illegal parameter: '$var'") unless exists $args{$var};
39 32         121 $args{$var} = $val;
40             }
41              
42             # Verify that supplied working directory is valid.
43 27 100       141 if ( defined $args{working_dir} ) {
44 5 50       114 croak("Invalid working_dir '$args{working_dir}'")
45             unless -d $args{working_dir};
46             }
47              
48             # Open a temp file.
49 27         272 my $tempfile_fh = File::Temp->new(
50             DIR => $args{working_dir},
51             UNLINK => 1,
52             );
53              
54 27         19729 return _new( $class, $args{working_dir}, $args{sortsub}, $args{cache_size},
55             $args{mem_threshold}, $tempfile_fh );
56             }
57              
58             my %finish_defaults = (
59             outfile => undef,
60             flags => ( O_CREAT | O_EXCL | O_WRONLY ),
61             );
62              
63             sub finish {
64 23     23 1 1354 my $self = shift;
65 23         94 my $runs = $self->_get_runs;
66              
67             # If we've never flushed the cache, perform the final sort in-memory.
68 23 100       101 if ( !@$runs ) {
69 7         43 my $item_cache = $self->_get_item_cache;
70 7         27 my $sortsub = $self->_get_sortsub;
71 7 100       130 @$item_cache
72             = $sortsub ? sort $sortsub @$item_cache : sort @$item_cache;
73             }
74             else {
75 16         107 $self->_write_item_cache_to_tempfile;
76 11         51 while ( @$runs > $MAX_RUNS ) {
77 2         539 $self->_consolidate;
78             }
79             }
80              
81             # If called with arguments, we must be printing everything to an outfile.
82 18 100       419 if (@_) {
83             # Verify args.
84 4         19 my %args = %finish_defaults;
85 4         15 while (@_) {
86 8         15 my ( $var, $val ) = ( shift, shift );
87             # Back compat: labeled params used to start with a dash.
88 8         22 $var =~ s/^-//;
89 8 50       19 croak("Illegal parameter: '$var'")
90             unless exists $finish_defaults{$var};
91 8         26 $args{$var} = $val;
92             }
93              
94             # Get an outfile and print everything to it.
95 4 50       18 croak('Argument outfile is required') unless defined $args{outfile};
96 4 50       284 sysopen( my $out_fh, $args{outfile}, $args{flags} )
97             or croak("Couldn't open outfile '$args{outfile}': $!");
98 4         20 $self->_finish_to_filehandle($out_fh);
99 4 50       288 close $out_fh or croak("Couldn't close '$args{outfile}': $!");
100             }
101             }
102              
103             sub _finish_to_filehandle {
104 4     4   8 my ( $self, $fh ) = @_;
105 4         16 my $item_cache = $self->_get_item_cache;
106 4         10 do {
107 30         219 print $fh $_ for @$item_cache;
108 30         61 @$item_cache = ();
109 30         67 $self->_gatekeeper; # Refreshes @$item_cache.
110             } while (@$item_cache);
111             }
112              
113             sub _consolidate {
114 2     2   15 my $self = shift;
115 2         8 my $item_cache = $self->_get_item_cache;
116 2         6 my $runs = $self->_get_runs;
117 2         28 my $fh = File::Temp->new(
118             DIR => $self->_get_working_dir,
119             UNLINK => 1,
120             );
121 2         951 my @to_consolidate = @$runs;
122 2         4 @$runs = ();
123 2         3 my @consolidated;
124 2         8 while (@to_consolidate) {
125 4 100       10 my $num_to_splice = @to_consolidate < 10 ? @to_consolidate : 10;
126 4         13 push @$runs, splice( @to_consolidate, 0, 10 );
127 4         7 my $start = tell $fh;
128 4         6 do {
129 33         125 $self->_print_to_sortfile( $item_cache, $fh );
130 33         54 @$item_cache = ();
131 33         62 $self->_gatekeeper; # Refreshes @$item_cache.
132             } while (@$item_cache);
133 4         35 my $run = Sort::External::SortExRun->_new( $fh, $start, tell($fh) );
134 4         9 push @consolidated, $run;
135 4         14 @$runs = ();
136             }
137 2         5 @$runs = @consolidated;
138 2         22 $self->_set_temp_fh($fh);
139             }
140              
141             # Reload the main cache using elements from the individual run caches.
142             #
143             # Examine all SortExRun objects, making sure that they have at least one
144             # recovered element in memory. Find the element among the run caches which we
145             # can guarantee sorts before any element yet to be recovered from disk. Move
146             # all elements from the run caches which sort before or equal to this cutoff
147             # into the main object's cache.
148             sub _gatekeeper {
149 190     190   31444 my $self = shift;
150 190         2999 my $runs = $self->_get_runs;
151 190         618 my $item_cache = $self->_get_item_cache;
152 190         554 my $sortsub = $self->_get_sortsub;
153              
154             # Discard exhausted runs.
155 190 100       337 @$runs = grep { $#{ $_->_get_buffarray } != -1 or $_->_refill_buffer } @$runs;
  1494         1551  
  1494         16550  
156              
157 190 100       928 if ( @$runs == 0 ) {
    100          
158 22         42 @$item_cache = ();
159             }
160             elsif ( @$runs == 1 ) {
161             # If there's only one SortExRun, no need to sort.
162 18         42 my $run = $runs->[0];
163 18         64 my $buffarray = $run->_get_buffarray;
164 18         75 $run->_set_buffarray( [] );
165 18         3683 @$item_cache = @$buffarray;
166             }
167             else {
168             # Choose the cutoff from among the lowest elements present in each
169             # run's cache.
170 150         226 my @on_the_bubble = map { $_->_get_buffarray->[-1] } @$runs;
  1315         6928  
171             @on_the_bubble
172 150 100       978 = $sortsub
173             ? sort $sortsub @on_the_bubble
174             : sort @on_the_bubble;
175 150         5379 my $cutoff = $on_the_bubble[0];
176              
177             # Let all qualified items into the out_batch.
178 150         348 my @out_batch;
179 150         279 for my $run (@$runs) {
180 1315         3447 my $buffarray = $run->_get_buffarray;
181 1315         2788 my $tick = $self->_define_range( $buffarray, $cutoff );
182 1315 100       3236 next if $tick == -1;
183 150         173 my $num_to_splice = $tick + 1;
184 150         607 push @out_batch, splice( @$buffarray, 0, $num_to_splice );
185             }
186              
187             # Refresh the item cache and prepare to return elements.
188 150 100       1095 @$item_cache = $sortsub ? sort $sortsub @out_batch : sort @out_batch;
189             }
190              
191 190         1727 $self->_set_fetch_tick(0);
192 190         892 return;
193             }
194              
195             # Compare two elements using either standard lexical comparison or the sortsub
196             # provided to the object's constructor.
197             sub _compare {
198 3652     3652   14932 my ( $self, $item_a, $item_b ) = @_;
199 3652         16761 my $sortsub = $self->_get_sortsub;
200 3652 100       7590 if ( defined $sortsub ) {
201 2141         4194 local $a = $item_a;
202 2141         2992 local $b = $item_b;
203 2141         7246 return $sortsub->( $a, $b );
204             }
205             else {
206 1511         5753 return $item_a cmp $item_b;
207             }
208             }
209              
210             # Flush the items in the input cache to a tempfile, sorting as we go.
211             sub _write_item_cache_to_tempfile {
212 196     196   6302 my $self = shift;
213 196         485 my $item_cache = $self->_get_item_cache;
214 196         477 my $sortsub = $self->_get_sortsub;
215              
216 196 100       652 return unless @$item_cache;
217              
218             # Print the sorted cache to the tempfile.
219 187 100       10546 @$item_cache = $sortsub ? sort $sortsub @$item_cache : sort @$item_cache;
220 187         1821 my $tempfile_fh = $self->_get_tempfile_fh;
221 187         289 my $start = tell($tempfile_fh);
222 187         5656 $self->_print_to_sortfile( $item_cache, $tempfile_fh );
223              
224             # Add a SortExRun object to the runs array.
225 182         1385 my $run = Sort::External::SortExRun->_new( $tempfile_fh, $start,
226             tell($tempfile_fh) );
227 182         229 push @{ $self->_get_runs }, $run;
  182         767  
228              
229             # Reset cache variables.
230 182         1217 $self->_set_mem_bytes(0);
231 182         2114 $#$item_cache = -1;
232             }
233              
234             # Return the highest index in an array representing an element lower than
235             # a given cutoff.
236             sub _define_range {
237 1337     1337   24304 my ( $self, $array, $target ) = @_;
238 1337         2080 my ( $lo, $mid, $hi ) = ( 0, 0, $#$array );
239              
240             # Binary search.
241 1337         3069 while ( $hi - $lo > 1 ) {
242 969         1299 $mid = ( $lo + $hi ) >> 1;
243 969         2681 my $delta = $self->_compare( $array->[$mid], $target );
244 969 100       5925 if ( $delta < 0 ) { $lo = $mid }
  106 100       554  
    50          
245 841         3059 elsif ( $delta > 0 ) { $hi = $mid }
246 22         61 elsif ( $delta == 0 ) { $lo = $hi = $mid }
247             }
248              
249             # Get that last item in...
250 1337   100     4747 while ( $mid < $#$array
251             and $self->_compare( $array->[ $mid + 1 ], $target ) < 1 )
252             {
253 157         1047 $mid++;
254             }
255 1337   100     8064 while ( $mid >= 0 and $self->_compare( $array->[$mid], $target ) > 0 ) {
256 1588         9275 $mid--;
257             }
258              
259 1337         3114 return $mid;
260             }
261              
262             1;
263              
264             __END__