File Coverage

blib/lib/Stream/Aggregate.pm
Criterion Covered Total %
statement 614 664 92.4
branch 136 198 68.6
condition 19 28 67.8
subroutine 53 55 96.3
pod 1 4 25.0
total 823 949 86.7


line stmt bran cond sub pod time code
1             #
2             # TODO: what this can't handle right now is things like:
3             #
4             # * how many different URLs were there on a per query basis?
5             #
6              
7             package Stream::Aggregate;
8              
9 7     7   105377 use strict;
  7         15  
  7         335  
10 7     7   36 use warnings;
  7         13  
  7         512  
11 6     6   4919 use Hash::Util qw(lock_keys);
  6         23251  
  6         72  
12 6     6   448 use B::Deparse;
  6         12  
  6         282  
13 5     5   28 use List::Util qw(min max minstr maxstr);
  5         9  
  5         567  
14 5     5   4322 use Config::Checker;
  5         239306  
  5         419  
15 5     5   3532 use Stream::Aggregate::Stats;
  5         13  
  5         453  
16 5     5   2534 use Stream::Aggregate::Random;
  5         11  
  5         176  
17 5     5   4849 use List::EvenMoreUtils qw(list_difference_position);
  5         6864  
  5         323  
18 5     5   4515 use Tie::Function::Examples qw(%line_numbers);
  5         8628  
  5         787  
19 5     5   36 use Eval::LineNumbers qw(eval_line_numbers);
  5         18  
  5         223  
20 5     5   30 use Config::YAMLMacros::YAML;
  5         10  
  5         380  
21 5     5   30 use Carp qw(confess);
  5         10  
  5         207  
22 5     5   4664 use List::MoreUtils qw(uniq);
  5         6090  
  5         467  
23 5     5   3575 use Clone qw(clone);
  5         16126  
  5         46873  
24              
25             require Exporter;
26              
27             our @ISA = qw(Exporter);
28             our @EXPORT = qw(generate_aggregation_func);
29             our $VERSION = 0.406;
30              
31             our $suppress_line_numbers = 0;
32              
33             my $prototype_config = <<'END_PROTOTYPE';
34             max_stats_to_keep: '?<4000>Maximum number of stats to keep for mean/stddev etc[INTEGER]'
35             context: '?From $log, return an array describing the current context[CODE]'
36             context_config: '%optional configuration hash for "context" code'
37             context2columns: '?From @current_context, return a hash of columns[CODE]'
38             context2columns_config: '%optional configuration hash for "context2columns" code'
39             stringify_context: '?Turn @currnet_context into an array of strings[CODE]'
40             stringify_context_config: '%optional configuration hash for "stringify_context" code'
41             finalize_result: '?Final opportunity to adjust the return values[CODE]'
42             finalize_result_config: '%optional configuration has for "finalize_result" code'
43             filter: '?Should this result be saved for statistics and counted for counts?[CODE]'
44             filter_config: '%optional configuration hash for "filter" code'
45             filter_early: '?<0>Check the filter early (before figuring out contexts)?[BOOLEAN]'
46             passthrough: '?Any additional items for the output?[CODE]'
47             passthrough_config: '%optional configuration has for "passthrough" code'
48             ephemeral: '%ephemeral columns (column -> code)'
49             ephemeral0: '%ephemeral columns (column -> code, evaluated before "ephemeral")'
50             ephemeral2: '%ephemeral columns (column -> code, evaluated after "ephemeral")'
51             ephemeral3: '%ephemeral columns (column -> code, evaluated after crossproduct has set context (after "ephemeral2"))'
52             output: '%generated output columns (column -> code)'
53             counter: '%counter columns (column -> code)'
54             percentage: '%like a counter, but divided by the number of items'
55             sum: '%summation columns (column -> code)'
56             dominant: '%most frequent (mode) value (column -> code)'
57             mean: '%mean value columns (column -> code)'
58             standard_deviation: '%standard deviaton value columns (column -> code)'
59             median: '%median value columns (column -> code)'
60             min: '%min value columns (column -> code)'
61             max: '%max value columns (column -> code)'
62             minstr: '%minstr value columns (column -> code)'
63             maxstr: '%maxstr value columns (column -> code)'
64             keep: '%list of values to keep'
65             stat: '%statistical columns (using keep, column -> code)'
66             debug: '?<0>Print out the code for debugging'
67             strict: '?<0>enforce strict and warnings for user code'
68             preprocess: '?Code to pre-process the input data[CODE]'
69             item_name: '?<$log>Name of the item variable'
70             new_context: '?Code that is run when there is a new context[CODE]'
71             new_context_config: '%optional configuration hash for "new_context" code'
72             merge: '?Code that is run when merging a subcontext into a parent context[CODE]'
73             merge_config: '%optional configuration hash for "merge" code'
74             reduce: '?Code that is run when reducing the saved data to save memory[CODE]'
75             merge_config: '%optional configuration hash for "reduce" code'
76             crossproduct: '%crossproduct context, keys are existing columns, values are size limits'
77             simplify: '%code to choose new simpler values for over-full columns (column -> code)'
78             combinations: '%code to decide if new crossproduct context ($row) is worth keeping[CODE]'
79             END_PROTOTYPE
80              
81             sub nonblank
82             {
83 0     0 1 0 my $value = shift;
84 0 0       0 return undef unless defined $value;
85 0 0       0 return undef if $value eq '';
86 0         0 return $value;
87             }
88              
89             sub resume_line_numbering
90             {
91 47     47 0 246 my ($pkg, $file, $line) = caller(0);
92 47         254 return sprintf(qq{#line %d "generated-code-interpoloated-after-%s-%d"\n}, $line, $file, $line);
93             }
94              
95             sub generate_aggregation_func
96             {
97 5     5 0 158570 my ($agg_config, $extra, $user_extra) = @_;
98              
99 5         33 validate_aggregation_config($agg_config);
100              
101 5         8036 my $renumber = ! $agg_config->{debug};
102              
103             # input data
104 5         16 my $itemref;
105             my $last_item;
106              
107             #
108             # if counting URLs, then the @current_context might be something like:
109             # 'com', 'apple', '/movies', '/action'
110             # If counting queries it might be something like:
111             # 'homocide', 'movies'
112             #
113             # @contexts is an array to state variables ($ps) that corrospond to the
114             # elements of @current_context. @context_strings is a string-ified
115             # copy of @current_context to handle contexts which are references.
116             #
117             # $count_this is return from &$filter_func;
118             #
119 0         0 my @contexts;
120 0         0 my @context_strings;
121 0         0 my @current_context;
122 0         0 my $ps;
123 0         0 my $oldps;
124 5         11 my $count_this = 1;
125 5         48 my @items_seen = ( 0 );
126 5         11 my %cross_context;
127 5         10 my $cross_data = {};
128 5         14 my @cross_keys;
129 5         11 my $cross_limit = 1;
130 5         8 my $cross_count = 0;
131 5         14 my %cross_key_values;
132             my %persist;
133 0         0 my @combinations;
134              
135             # output
136 0         0 my $row;
137 0         0 my $suppress_result;
138              
139             # reduce data to limit memory use
140 0         0 my @keepers;
141 0         0 my @tossers;
142 5         14 my $max_stats2keep = $agg_config->{max_stats_to_keep};
143 5         15 my $do_reduce;
144              
145             # closures
146             my $get_context_func;
147 0         0 my $count_func;
148 0         0 my $initialize_func;
149 0         0 my $final_values_func;
150 0         0 my $merge_func;
151 0         0 my $context_columns_func;
152 0         0 my $preprocess_func;
153 0         0 my $filter_func;
154 0         0 my $stringify_func;
155 0         0 my $finalize_result_func;
156 0         0 my $passthrough_func;
157 0         0 my $user_merge_func;
158 0         0 my $user_new_context_func;
159 0         0 my $user_reduce_func;
160 0         0 my $cross_reduce_func;
161 0         0 my $new_ps_func;
162 0         0 my $process_func;
163 0         0 my $finish_context_func;
164 0         0 my $finish_cross_func;
165 0         0 my $add_context_component_func;
166 0         0 my $cross_key_reduce_func;
167 5         12 my $declarations = '';
168 5         10 my %combination_funcs;
169             my $do_combinations;
170              
171 5 50       27 my $strict = $agg_config->{strict}
172             ? "use strict; use warnings;"
173             : "no strict; no warnings;";
174              
175             my $eval_line_numbers = $agg_config->{debug}
176 0     0   0 ? sub { $_[0] }
177 5 50       25 : \&eval_line_numbers;
178              
179 5 100 66     34 if ($agg_config->{crossproduct} && keys %{$agg_config->{crossproduct}}) {
  3         31  
180 3         8 @cross_keys = sort keys %{$agg_config->{crossproduct}};
  3         22  
181 3         14 for my $k (@cross_keys) {
182 9         26 $cross_limit *= $agg_config->{crossproduct}{$k};
183             }
184             }
185              
186             my $compile_config = sub {
187 5     5   13 my %varname;
188             my $reduce_func;
189 0         0 my %s;
190 0         0 my %var_types;
191 0         0 my %var_value;
192              
193 5         435 my $deparse = B::Deparse->new("-p", "-sC");
194              
195             #
196             # A more sophisticated approach would figure out the dependencies of one value on
197             # another and order them appropriately. What's going on here is kinda hit & miss.
198             #
199             my $alias_varname = sub {
200 54         87 my ($cc, $value) = @_;
201 54         320 $varname{"\$column_$cc"} = $value;
202 5         56 };
203             my $usercode_inner = sub {
204             #
205             # The {precount} undef statements may not be required.
206             # They are there to be safe, just in case someone is referencing
207             # a column that hasn't had its value assigned yet. If so,
208             # they'll always get undef rather than a left-over value from
209             # a previous input record.
210             #
211 62         96 my ($cctype, $cc, $cc_code) = @_;
212 62 100       143 if (! defined($cc_code)) {
213 8         18 $declarations .= "my \$column_$cc;\n";
214 8         18 $s{precount} .= "\tundef \$column_$cc;\n";
215 8         17 return;
216             }
217 54 100       190 return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code};
218 35         59 my $original = $cc_code;
219 35 50       107 return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code};
220 35 100       118 $cc_code =~ s/(\$column_\w+)/defined($varname{$1}) ? $varname{$1} : $1/ge;
  18         122  
221 35 100       242 if ($cc_code =~ /\breturn\b/) {
    100          
    50          
222 5         43 $cc_code =~ s/^/\t\t/mg;
223 5 50       40 $s{user} .= qq{#line 3001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
224 5         19 $s{user} .= "my \$${cctype}_${cc}_func = sub {\n";
225 5         11 $s{user} .= $cc_code;
226 5         10 $s{user} .= "};\n\n";
227 5         16 $s{precount} .= "\tundef \$column_$cc;\n";
228 5         13 $s{$cctype} .= "\t\$column_$cc = ";
229 5         15 $s{$cctype} .= qq{\$${cctype}_${cc}_func->();\n};
230             } elsif ($cc_code =~ /[;\n]/) {
231 7         44 $cc_code =~ s/^/\t\t/mg;
232 7         26 $s{precount} .= "\tundef \$column_$cc;\n";
233 7         21 $s{$cctype} .= "\t\$column_$cc = ";
234 7         12 $s{$cctype} .= "do {\n";
235 7 50       51 $s{$cctype} .= qq{#line 4001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
236 7         26 $s{$cctype} .= $cc_code;
237 7         18 $s{$cctype} .= "\n\t};\n";
238             } elsif ($cc_code =~ /\A\$(column_\w+)\Z/) {
239 0         0 die "value of $cc_code isn't available yet, please compute it in an earlier step like 'ephemeral0'";
240             } else {
241 23 50       159 $s{$cctype} .= qq{#line 5001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
242 23         123 $s{precount} .= "\tundef \$column_$cc;\n";
243 23         74 $s{$cctype} .= "\t\$column_$cc = $cc_code;\n";
244             }
245 35         87 $declarations .= "my \$column_$cc;\n";
246              
247 5     5   42 my $te = eval "no strict; no warnings; sub { $cc_code }";
  5     5   9  
  5     5   216  
  5     5   27  
  5     4   8  
  5     4   549  
  5     4   41  
  5     4   10  
  5     4   197  
  5     4   27  
  5     4   10  
  5     3   441  
  4     3   30  
  4     3   63  
  4     2   1106  
  4     2   24  
  4     2   96  
  4     2   295  
  4     2   25  
  4     1   8  
  4     1   370  
  4     1   27  
  4     1   6  
  4     1   229  
  4         25  
  4         8  
  4         355  
  4         27  
  4         8  
  4         197  
  4         26  
  4         7  
  4         232  
  3         15  
  3         8  
  3         193  
  3         20  
  3         5  
  3         215  
  3         19  
  3         5  
  3         161  
  2         15  
  2         5  
  2         188  
  2         13  
  2         3  
  2         120  
  2         11  
  2         5  
  2         115  
  2         13  
  2         5  
  2         108  
  2         32  
  2         4  
  2         103  
  1         4  
  1         2  
  1         89  
  1         5  
  1         1  
  1         30  
  1         4  
  1         5  
  1         64  
  1         7  
  1         2  
  1         37  
  1         5  
  1         3  
  1         76  
  35         3717  
248 35 50       120 die "eval $cctype/$cc: $original ($cc_code): $@" if $@;
249 35         175956 my $body = $deparse->coderef2text($te);
250 35 50       197 return $varname{$body} if $varname{$body};
251 35         217 $varname{$body} = $varname{$cc_code} = $varname{$original} = "\$column_$cc";
252 35         140 $alias_varname->($cc, $varname{$cc_code});
253 5         93 };
254             my $usercode = sub {
255 62         116 my ($cctype, $cc, $cc_code) = @_;
256 62         116 my $value = $usercode_inner->(@_);
257 62         141 $var_value{$cc} = $value;
258 62         111 $var_types{$cc} = $cctype;
259 62         201 return $value;
260 5         30 };
261              
262 5         16 my %seen;
263             my $cc;
264              
265 5         43 my @all_data = qw(ephemeral0 ephemeral ephemeral2 ephemeral3 keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
266 5         28 my @lock_data = qw( keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
267 5         17 my @output_cols = qw( output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
268 5         26 my @kept_cols = qw( keep standard_deviation median dominant );
269 5         12 my @stats_cols = qw( standard_deviation median dominant );
270 5         12 my @cross_cols = qw(ephemeral0 ephemeral ephemeral2 );
271 5         15 my %cross_cols;
272 5         20 @cross_cols{@cross_cols} = @cross_cols;
273              
274             #
275             # Compile all the user code that for the various columns
276             #
277 5         16 for my $ucc (@all_data) {
278 90 100       270 next unless $agg_config->{$ucc};
279 34         51 for $cc (sort keys %{$agg_config->{$ucc}}) {
  34         169  
280 62 50       214 die "column $cc is duplicated" if $seen{$cc}++;
281 62         197 $usercode->($ucc, $cc, $agg_config->{$ucc}{$cc});
282             }
283             }
284              
285             #
286             # 'keep' has to be first because 'stat' can't rewrite names
287             #
288 5         12 my %donekeep;
289 5         12 my $has_keepers = 0;
290 5         16 for my $keepers (@kept_cols) {
291 20         31 for $cc (sort keys %{$agg_config->{$keepers}}) {
  20         91  
292 13 100       51 next if $donekeep{$varname{$agg_config->{$keepers}{$cc}}};
293 10         31 $donekeep{$varname{$agg_config->{$keepers}{$cc}}} = $cc;
294 10         29 $s{initialize} .= "\t\$ps->{keep}{$cc} = [];\n";
295 10         45 $s{keeper2} .= "\t\tpush(\@{\$ps->{keep}{$cc}}, $varname{$agg_config->{$keepers}{$cc}}) if \$count_this;\n";
296 10         32 $s{merge} .= "\tpush(\@{\$ps->{keep}{$cc}}, \@{\$oldps->{keep}{$cc}});\n";
297 10         28 $s{reduce2} .= "\t\@{\$ps->{keep}{$cc}} = \@{\$ps->{keep}{$cc}}[\@keepers];\n";
298 10         24 $has_keepers++;
299             }
300             }
301 5 100       20 if ($has_keepers) {
302 4         12 $s{initialize} .= "\t# has keepers\n";
303 4         10 $s{initialize} .= "\t\$ps->{numeric} = {};\n";
304              
305 4         33 $s{fv_setup} .= "\t# has keepers\n";
306 4         9 $s{fv_setup} .= "\tlocal(\$Stream::Aggregate::Stats::ps) = \$ps;\n";
307              
308 4 50       31 $s{keeper1} .= resume_line_numbering if $renumber;
309 4         17 $s{keeper1} .= "\t# has keepers\n";
310 4         10 $s{keeper1} .= "\tmy \$random = rand(1);\n";
311 4         13 $s{keeper1} .= "\tif (\@{\$ps->{random}} < $max_stats2keep || \$random < \$ps->{random}[0]) {\n";
312 4         10 $s{keeper1} .= "\t\tpush(\@{\$ps->{random}}, \$random);\n";
313              
314 4 50       21 $s{keeper3} .= resume_line_numbering if $renumber;
315 4         10 $s{keeper3} .= "\t\t# has keepers\n";
316 4         15 $s{keeper3} .= "\t\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n";
317 4         9 $s{keeper3} .= "\t}\n";
318              
319 4 50       20 $s{merge} .= resume_line_numbering if $renumber;
320 4         10 $s{merge} .= "\t# has keepers\n";
321 4         9 $s{merge} .= "\tpush(\@{\$ps->{random}}, \@{\$oldps->{random}});\n";
322              
323 4 50       43 $s{merge2} .= resume_line_numbering if $renumber;
324 4         9 $s{merge2} .= "\t# has keepers\n";
325 4         22 $s{merge2} .= "\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n";
326              
327 4         30 $s{reduce} .= $eval_line_numbers->(<<'END_REDUCE');
328             # has keepers
329             my $random = $ps->{random};
330             @keepers = sort { $random->[$a] cmp $random->[$b] } 0..$#$random;
331             @tossers = splice(@keepers, $max_stats2keep);
332             @$random = @$random[@keepers];
333             END_REDUCE
334 4 50       89 $s{reduce} .= resume_line_numbering if $renumber;
335             }
336              
337 5         11 for $cc (sort keys %{$agg_config->{output}}) {
  5         26  
338 5         24 $s{initialize} .= "\t\$ps->{output}{$cc} = 0;\n";
339             }
340              
341 5         14 for $cc (sort keys %{$agg_config->{counter}}) {
  5         23  
342 1         3 $s{initialize} .= "\t\$ps->{counter}{$cc} = 0;\n";
343 1         6 $s{count2} .= "\t\$ps->{counter}{$cc}++ if $varname{$agg_config->{counter}{$cc}};\n";
344 1         12 $s{merge} .= "\t\$ps->{counter}{$cc} += \$oldps->{counter}{$cc};\n";
345             }
346              
347 5         12 for $cc (sort keys %{$agg_config->{percentage}}) {
  5         23  
348 1         5 $s{initialize} .= "\t\$ps->{percentage}{$cc} = undef;\n";
349 1         5 $s{stat} .= "\t\$ps->{percentage}{$cc} = \$ps->{percentage_counter}{$cc} * 100 / (\$ps->{percentage_total}{$cc} || .001);\n";
350 1         3 $s{initialize} .= "\t\$ps->{percentage_counter}{$cc} = 0;\n";
351 1         4 $s{initialize} .= "\t\$ps->{percentage_total}{$cc} = 0;\n";
352 1         5 $s{count2} .= "\t\$ps->{percentage_counter}{$cc}++ if $varname{$agg_config->{percentage}{$cc}};\n";
353 1         5 $s{count2} .= "\t\$ps->{percentage_total}{$cc}++ if defined $varname{$agg_config->{percentage}{$cc}};\n";
354 1         4 $s{merge} .= "\t\$ps->{percentage_counter}{$cc} += \$oldps->{percentage_counter}{$cc};\n";
355 1         4 $s{merge} .= "\t\$ps->{percentage_total}{$cc} += \$oldps->{percentage_total}{$cc};\n";
356             }
357              
358 5         14 for $cc (sort keys %{$agg_config->{sum}}) {
  5         25  
359 3         14 $s{initialize} .= "\t\$ps->{sum}{$cc} = 0;\n";
360 3         13 $s{count2} .= "\t\$ps->{sum}{$cc} += $varname{$agg_config->{sum}{$cc}};\n";
361 3         18 $s{merge} .= "\t\$ps->{sum}{$cc} += \$oldps->{sum}{$cc};\n";
362             }
363              
364 5         88 for $cc (sort keys %{$agg_config->{mean}}) {
  5         21  
365 7         26 $s{initialize} .= "\t\$ps->{mean}{$cc} = undef;\n";
366 7         36 $s{stat} .= "\t\$ps->{mean}{$cc} = \$ps->{mean_sum}{$cc} / (\$ps->{mean_count}{$cc} || 100);\n";
367 7         24 $s{initialize} .= "\t\$ps->{mean_sum}{$cc} = 0;\n";
368 7         24 $s{initialize} .= "\t\$ps->{mean_count}{$cc} = 0;\n";
369 7         32 $s{count2} .= "\tif (defined($varname{$agg_config->{mean}{$cc}})) {\n";
370 7         28 $s{count2} .= "\t \$ps->{mean_sum}{$cc} += $varname{$agg_config->{mean}{$cc}};\n";
371 7         22 $s{count2} .= "\t \$ps->{mean_count}{$cc}++;\n";
372 7         13 $s{count2} .= "\t}\n";
373 7         26 $s{merge} .= "\t\$ps->{mean_sum}{$cc} += \$oldps->{mean_sum}{$cc};\n";
374 7         28 $s{merge} .= "\t\$ps->{mean_count}{$cc} += \$oldps->{mean_count}{$cc};\n";
375             }
376              
377 5         15 for $cc (sort keys %{$agg_config->{min}}) {
  5         21  
378 2         7 $s{initialize} .= "\t\$ps->{min}{$cc} = undef;\n";
379 2         11 $s{count2} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, $varname{$agg_config->{min}{$cc}};\n";
380 2         9 $s{merge} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, \$oldps->{min}{$cc};\n";
381             }
382              
383 5         22 for $cc (sort keys %{$agg_config->{minstr}}) {
  5         25  
384 0         0 $s{initialize} .= "\t\$ps->{minstr}{$cc} = undef;\n";
385 0         0 $s{count2} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, $varname{$agg_config->{minstr}{$cc}};\n";
386 0         0 $s{merge} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, \$oldps->{minstr}{$cc};\n";
387             }
388              
389 5         13 for $cc (sort keys %{$agg_config->{max}}) {
  5         21  
390 3         11 $s{initialize} .= "\t\$ps->{max}{$cc} = undef;\n";
391 3         14 $s{count2} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, $varname{$agg_config->{max}{$cc}};\n";
392 3         14 $s{merge} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, \$oldps->{max}{$cc};\n";
393             }
394              
395 5         15 for $cc (sort keys %{$agg_config->{maxstr}}) {
  5         21  
396 0         0 $s{initialize} .= "\t\$ps->{maxstr}{$cc} = undef;\n";
397 0         0 $s{count2} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, $varname{$agg_config->{maxstr}{$cc}};\n";
398 0         0 $s{merge} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, \$oldps->{maxstr}{$cc};\n";
399             }
400              
401 5         15 for my $statc (@stats_cols) {
402 15         22 for $cc (sort keys %{$agg_config->{$statc}}) {
  15         60  
403 6   50     33 my $keepcc = $donekeep{$varname{$agg_config->{$statc}{$cc}}} || die;
404 6         33 $s{initialize} .= "\t\$ps->{$statc}{$cc} = undef;\n";
405 6         32 $s{stat} .= "\t\$ps->{$statc}{$cc} = $statc('$keepcc');\n";
406             }
407             }
408              
409 5         12 for $cc (sort keys %{$agg_config->{stat}}) {
  5         20  
410 3         11 $s{stat} .= "\t\$ps->{stat}{$cc} = $varname{$agg_config->{stat}{$cc}};\n";
411 3         7 $s{initialize} .= "\t\$ps->{stat}{$cc} = undef;\n";
412             }
413              
414 5         13 for my $cc (sort keys %{$agg_config->{output}}) {
  5         18  
415 5         22 $s{initialize} .= "\t\$ps->{output}{$cc} = undef;\n";
416 5         24 $s{stat} .= "\t\$ps->{output}{$cc} = $varname{$agg_config->{output}{$cc}};\n";
417             }
418              
419 5         16 for my $icol (@lock_data) {
420 70         206 $s{initialize} .= "\tlock_keys(%{\$ps->{$icol}});\n"
421 70 100       69 if keys %{$agg_config->{$icol}};
422             }
423              
424 5         14 for my $ctype (@output_cols) {
425 65         65 for $cc (sort keys %{$agg_config->{$ctype}}) {
  65         166  
426 31         103 $s{final_values} .= "\t\$row->{$cc} = \$ps->{$ctype}{$cc};\n";
427             }
428             }
429 5 100       32 $s{final_values} .= "\t&\$finalize_result_func;\n" if $agg_config->{finalize_result};
430              
431 5         12 my $code = $strict;
432 5 50       32 $code .= qq{\n#line 1 "FAKE-all-code-for-$extra->{name}"\n} if $renumber;
433 5         22 $code .= qq{\nmy $agg_config->{item_name};\n};
434 5         20 $code .= $declarations;
435 5         8 $code .= "{\n";
436              
437 5         28 $s{reduce} .= "\t&\$user_reduce_func;\n";
438              
439             my $assemble_code = sub {
440 50         203 my ($func, @keys) = @_;
441 50         54 my $something;
442 50         61 my $c = "# ---------------------------------------------------------------\n";
443 50 100       188 $c .= "\$${func}_func = sub {\n"
444             if $func;
445 50         72 for my $s (@keys) {
446 200 100       419 next unless exists $s{$s};
447 114 50       468 $c .= qq{\n#line 1001 "FAKEFUNC-$extra->{name}-$func-$s"\n} if $renumber;
448 114         209 $c .= $s{$s};
449 114         169 delete $s{$s};
450 114         189 $something = 1;
451             }
452 50 100 100     218 $c .= "\t0\n"
453             if $func && ! $something;
454 50 100       114 $c .= "};\n"
455             if $func;
456 50         373 return $c;
457 5         37 };
458              
459             #
460             # Cross product aggregation & counts
461             #
462 5 100       34 if (@cross_keys) {
    50          
463 3         6 my $esub = '';
464 3         7 my $newsub = '';
465 3         5 my $oldsub = '';
466 3         7 my $loop_in = '';
467 3         5 my $loop_in2 = '';
468 3         6 my $loop_in3 = '';
469 3         4 my $loop_in3a = '';
470 3         7 my $loop_out = '';
471 3         5 my $loop_out2 = '';
472 3         5 my $loop_indent = "";
473 3         6 my $loop_head = '';
474 3         4 my $loop_mid = '';
475 3         6 my $loop_mid3 = '';
476 3         6 my $loop_dbug_old = '';
477 3         6 my $loop_dbug_new = '';
478 3         7 for my $cc (@cross_keys) {
479 9 50       27 die "Crossproduct column '$cc' doesn't exist" unless $var_types{$cc};
480 9 50       25 die "Crossproduct column '$cc' ($var_types{$cc}) isn't a valid type (@cross_cols)" unless $cross_cols{$var_types{$cc}};
481              
482 9   100     46 my $cc_code = $agg_config->{simplify}{$cc} || 'return "*";';
483 9         54 $s{user} .= "my \$simplify_$cc = sub {\n";
484 9 50       42 $s{user} .= qq{#line 3001 "FAKE-$extra->{name}-simplify-$cc"\n} if $renumber;
485 9         19 $s{user} .= "\t".$cc_code;
486 9         12 $s{user} .= "\n};\n";
487              
488 9         28 $loop_head .= "\tmy %key_count_$cc;\n";
489              
490 9         27 $loop_mid .= "\tmy \$key_map_$cc = \$cross_key_reduce_func->('$cc', \\%key_count_$cc, \$simplify_$cc);\n";
491 9         25 $loop_mid3 .= ", $cc => \$key_$cc";
492              
493 9         18 $loop_dbug_old .= " $cc:\$key_$cc";
494 9         16 $loop_dbug_new .= " $cc:\$new_$cc";
495              
496 9         23 $loop_in2 .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n";
497              
498 9         34 $loop_in .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n";
499 9         24 $loop_in .= "$loop_indent my \$new_$cc = \$key_$cc;\n";
500 9         18 $loop_in .= "$loop_indent my \$must_inc = 0;\n";
501 9         19 $loop_in .= "$loop_indent if (exists \$key_map_${cc}->{\$key_$cc}) {\n";
502 9         21 $loop_in .= "$loop_indent \$new_$cc = \$key_map_${cc}->{\$key_$cc};\n";
503 9         22 $loop_in .= "$loop_indent \$must_inc = 1;\n";
504 9         14 $loop_in .= "$loop_indent \$must_do++;\n";
505 9         13 $loop_in .= "$loop_indent } else {\n";
506 9         19 $loop_in .= "$loop_indent \$new_$cc = \$key_$cc;\n";
507 9         16 $loop_in .= "$loop_indent }\n";
508              
509 9         20 $loop_in3a .= "\$key_count_${cc}{\$key_$cc}++;\n";
510              
511 9         15 $loop_out .= "$loop_indent }\n";
512 9         16 $loop_out .= "$loop_indent \$must_do -= \$must_inc;\n";
513 9         14 $loop_out2 .= "$loop_indent }\n";
514              
515 9         17 $loop_indent .= "\t";
516              
517 9         23 $esub .= "->{$var_value{$cc}}";
518 9         18 $newsub .= "->{\$new_$cc}";
519 9         26 $oldsub .= "->{\$key_$cc}";
520             };
521 3         16 for my $in3a (split(/\n/, $loop_in3a)) {
522 9         43 $loop_in3 .= "$loop_indent $in3a\n";
523             }
524              
525 3         21 $loop_out = join("\n", reverse split(/\n/, $loop_out)) . "\n";
526 3         554 $loop_out2 = join("\n", reverse split(/\n/, $loop_out2)) . "\n";
527              
528             #
529             # Reduce the number of contexts
530             #
531              
532             $cross_key_reduce_func = sub {
533 9         727 my ($keyname, $valcounts, $simplify_func) = @_;
534 9         11 my %ret;
535 9 100       52 if (keys %$valcounts > $agg_config->{crossproduct}{$keyname}) {
536 2         5 $do_reduce = 1;
537 2         6 my $limit = $agg_config->{crossproduct}{$keyname};
538 2         7 my $current = keys %$valcounts;
539 2         2 my %seen;
540             my %new;
541 2         12 for my $val (sort { $valcounts->{$a} <=> $valcounts->{$b} } keys %$valcounts) {
  33         68  
542 15 100       31 if ($current > $limit) {
543 12         28 my $new = $simplify_func->($val, $keyname);
544 12 50       39 next if $new eq $val;
545 12 100       36 if ($seen{$new}++) {
546 10         12 $current--;
547             }
548 12         14 $new{$new}++;
549 12 50       24 if ($new{$val}) {
550             # we can't throw this one away since we have new
551             # users... we may not be able to meet our contract.
552 0 0       0 $current-- unless --$seen{$new};
553 0         0 $new{$new}--;
554 0         0 next;
555             } else {
556 12         25 $ret{$val} = $new;
557             }
558             }
559             }
560             }
561 9 50       27 print STDERR YAML::Dump("reduce $keyname", \%ret) if $agg_config->{debug} > 2;
562 9         30 return \%ret;
563 3         26 };
564              
565 3         8 my $db1 = '';
566 3         5 my $db2 = '';
567 3 50       18 $db1 = qq{print STDERR "Merging\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tinto\t$loop_dbug_new\t\$cross_count\\n";} if $agg_config->{debug};
568 3 50       13 $db2 = qq{print STDERR "Moving\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tto\t$loop_dbug_new\\n";} if $agg_config->{debug};
569 3 50       36 $s{cross_reduce} .= resume_line_numbering if $renumber;
570              
571 3         11 $s{cross_reduce} .= "\t\$do_reduce = 0;\n";
572 3         7 $s{cross_reduce} .= "\tmy \$must_do = 0;\n";
573 3         14 $s{cross_reduce} .= $loop_head;
574 3         9 $s{cross_reduce} .= $loop_in2;
575 3         8 $s{cross_reduce} .= $loop_in3;
576 3         14 $s{cross_reduce} .= $loop_out2;
577 3         8 $s{cross_reduce} .= $loop_mid;
578 3         8 $s{cross_reduce} .= $loop_in;
579 3         42 $s{cross_reduce} .= $eval_line_numbers->(<
580             # --------------- reduce -------------
581             if (\$must_do) {
582             if (\$cross_data$newsub) {
583             \$cross_count--;
584             $db1
585             \$ps = \$cross_data$newsub;
586             \$oldps = delete \$cross_data$oldsub;
587             #
588             # print STDERR "ABOUT TO MERGE: \$key_color \$key_size \$key_style \$oldps\\n";
589             # print STDERR YAML::Dump("Pre-mege cross-data", \$cross_data);
590             #
591             &\$merge_func;
592             \$ps = \$contexts[-1];
593             } else {
594             $db2
595             \$cross_data$newsub = delete \$cross_data$oldsub;
596             }
597             }
598             # --------------- reduce -------------
599             END_CR
600 3 50       216 $s{cross_reduce} .= resume_line_numbering if $renumber;
601 3         9 $s{cross_reduce} .= $loop_out;
602              
603             #
604             # Add data to the right context
605             #
606 3         12 my $db3 = '';
607 3 50       19 $db3 = qq{print STDERR "Cross-count: \$cross_count\\n";} if $agg_config->{debug} > 3;
608 3         27 $s{crossproduct} .= $eval_line_numbers->(<
609             if (\$cross_count > \$cross_limit * 2) {
610             &\$cross_reduce_func;
611             }
612             if (\$cross_data$esub) {
613             \$ps = \$cross_data$esub;
614             } else {
615             &\$new_ps_func;
616             \$cross_data$esub = \$ps;
617             \$cross_count++;
618             $db3
619             }
620             END_CP
621 3 50       54 $s{crossproduct} .= resume_line_numbering if $renumber;
622              
623             #
624             # handle combinations
625             #
626 3         11 $s{inner_combine} = '';
627 3         6 $s{outer_combine} = '';
628 3 100       15 if ($agg_config->{combinations}) {
629 2         3 my $generate_combinations;
630 2         4 my $combination_number = 0;
631 2         5 my %mapping;
632             $generate_combinations = sub {
633 12         29 my ($output_field, $input_ps, $indent, $keys, $done, $loop_over) = @_;
634 12         18 my $out = \$s{$output_field};
635 12         16 my $loopout = '';
636 12         17 my $x = '';
637 12         14 my $y = '';
638 12 100       25 my @loop_keys = grep { $agg_config->{combinations}{$_} && ! $done->{$_} } sort @$keys;
  17         107  
639 12         30 my @delayed_call;
640 12 100       35 if ($loop_over) {
641 10         13 for my $k (@loop_keys) {
642 11         29 $$out .= "$indent$x for my \$ck_$k (keys %{$input_ps}) {\n";
643 11         22 $loopout = "$indent$x }\n$loopout";
644 11         14 $x .= "\t";
645 11         24 $input_ps .= "{\$ck_$k}";
646             }
647 10         14 $y = $x;
648 10 100       20 if (! @loop_keys) {
649 2         5 $y .= "\t";
650 2         7 $$out .= "$indent$x if ($input_ps) {\n";
651             }
652 10         30 $$out .= "$indent$y \$row = ${input_ps}->{row};\n";
653             }
654 12         18 for my $cc (@loop_keys) {
655 16         26 my @keeping = grep { $_ ne $cc } @loop_keys;
  30         63  
656 16 100       71 if ($mapping{"@keeping"}++) {
657 6         15 $$out .= "$indent$x # we've already handled keeping '@keeping'\n";
658 6         104 next;
659             }
660              
661 11         38 my $accessor = @keeping
662 10 100       26 ? "{" . join("}{", map { "\$row->{'$_'}" } @keeping) . "}"
663             : '';
664              
665 10 100 100     50 if ($loop_over && @keeping) {
666 3         4 $accessor = "{" . join("}{", map { "\$ck_$_" } @keeping) . "}"
  3         13  
667             }
668              
669             # yes, we're using auto-vivification. It's ugly, but simplifies
670             # the code.
671              
672 10         15 $$out .= "\n";
673 10         18 $$out .= "$indent$x # combine, dropping $cc";
674 10 100       32 $$out .= ", keeping: @keeping" if @keeping;
675 10         13 $$out .= "\n";
676              
677 10         20 $$out .= "$indent$x if (\$combination_funcs{'$cc'}->()) {\n";
678 10         23 $$out .= "$indent$x if (\$combinations[$combination_number]$accessor) {\n";
679 10         19 $$out .= "$indent$x local(\$Stream::Aggregate::Stats::ps)\n";
680 10         13 $$out .= "$indent$x = \$ps\n";
681 10         19 $$out .= "$indent$x = \$combinations[$combination_number]$accessor;\n";
682 10 100       29 $$out .= "$indent$x \$oldps = $input_ps;\n" if $input_ps ne '$oldps';
683 10         34 $$out .= "$indent$x &\$merge_func;\n";
684 10         16 $$out .= "$indent$x } else {\n";
685 10         27 $$out .= "$indent$x \$ps = \$combinations[$combination_number]$accessor = clone($input_ps);\n";
686 10         17 $$out .= "$indent$x \$ps->{row} = { %\$row };\n";
687 10         17 $$out .= "$indent$x delete \$ps->{row}{'$cc'};\n";
688 10         14 $$out .= "$indent$x }\n";
689 10         89 $$out .= "$indent$x }\n";
690              
691 10         16 my $pnum = $combination_number++;
692              
693             push(@delayed_call, sub {
694 10         101 $generate_combinations->(
695             outer_combine => "\$combinations[$pnum]",
696             "",
697             \@keeping,
698             { %$done, $cc => 1 },
699             $cc);
700 10         50 });
701             }
702 12 100       26 if ($loop_over) {
703 10         15 $$out .= "\n";
704 10         27 $$out .= "$indent$y # final values with '@loop_keys' keys\n";
705 10         44 $$out .= "$indent$y local(\$Stream::Aggregate::Stats::ps) = \$ps = ${input_ps};\n";
706 10         19 $$out .= "$indent$y \$suppress_result = 0;\n";
707 10         14 $$out .= "$indent$y \$final_values_func->();\n";
708 10         26 $$out .= "$indent$y push(\@\$retref, \$row) unless \$suppress_result;\n";
709 10         13 $$out .= "\n";
710 10 100       22 if (! @loop_keys) {
711 2         5 $$out .= "$indent$x }\n";
712             }
713              
714 10         14 $$out .= $loopout;
715             }
716 12         94 while (my $dc = shift(@delayed_call)) {
717 10         21 $dc->();
718             }
719              
720 2         33 };
721 2         11 $generate_combinations->(
722             inner_combine => '$oldps',
723             "\t\t\t",
724             \@cross_keys,
725             {},
726             undef);
727             }
728              
729             #
730             # Return the cross product results
731             #
732 3 50       15 $s{finish_cross} .= qq{print STDERR "Finish cross called\n";} if $agg_config->{debug} > 7;
733 3 50       34 $s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-before',\$cross_data);\n} if $agg_config->{debug} > 8;
734 3         8 $s{finish_cross} .= "\tmy (\$retref) = shift;\n";
735 3         6 $s{finish_cross} .= "\tmy \$rowtmp;\n";
736 3         7 $s{finish_cross} .= "\t&\$cross_reduce_func;\n";
737 3 50       12 $s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-after',\$cross_data);\n} if $agg_config->{debug} > 8;
738 3         6 $s{finish_cross} .= $loop_in2;
739 3         26 $s{finish_cross} .= $eval_line_numbers->(<
740             # --------------- finish cross -------------
741             local(\$Stream::Aggregate::Stats::ps)
742             = \$ps
743             = \$cross_data$oldsub;
744             confess unless \$ps;
745             \$suppress_result = 0;
746             \$rowtmp = \$row = { &\$context_columns_func $loop_mid3 };
747             &\$final_values_func;
748             push(@\$retref, \$row) unless \$suppress_result;
749             \$oldps = delete \$cross_data$oldsub;
750             \$oldps->{row} = \$row;
751             \$ps = \$contexts[-1];
752             &\$merge_func if \$ps;
753             \$cross_count--;
754             $db3
755             END_FC
756 3         60 $s{finish_cross} .= delete $s{inner_combine};
757 3         7 $s{finish_cross} .= "\t\t\t\t# --------------- finish cross -------------\n";
758 3 50       16 $s{finish_cross} .= resume_line_numbering if $renumber;
759 3         7 $s{finish_cross} .= $loop_out2;
760 3         17 $s{finish_cross} .= delete $s{outer_combine};
761             } elsif ($agg_config->{combinations}) {
762 0         0 die "combinations requires crossproduct which isn't defined";
763             }
764              
765 5         22 $code .= $eval_line_numbers->(<<'END_FIELDS');
766              
767             my $compile_user_code = sub {
768             my ($c, $field, $config_key, $default) = @_;
769             return $default unless defined $c->{$field};
770             my $config = $c->{$config_key} || {}; # maybe used by eval
771             my $coderef;
772             my $code = $strict;
773             $code .= qq{\n#line 2001 "FAKE-$extra->{name}-$field"\n} if $renumber;
774             $code .= qq{sub { $c->{$field} }; };
775 4     4   28 my $sub = eval $code;
  4     4   8  
  4     4   161  
  4     4   40  
  4     4   7  
  4     4   929  
  4         25  
  4         11  
  4         232  
  4         22  
  4         8  
  4         760  
  4         22  
  4         8  
  4         145  
  4         104  
  4         8  
  4         715  
776             die "Cannot compile user code for $extra->{name}/$field: $@\n$code" if $@;
777             return $coderef if $coderef;
778             return $sub;
779             };
780              
781             $get_context_func = $compile_user_code->($agg_config, 'context', 'context_config', sub { return () });
782             $context_columns_func = $compile_user_code->($agg_config, 'context2columns', 'context2columns_config', sub { return () });
783             $filter_func = $compile_user_code->($agg_config, 'filter', 'filter_config', sub { 1 });
784             $preprocess_func = $compile_user_code->($agg_config, 'preprocess', 'preprocess_config', sub {});
785             $stringify_func = $compile_user_code->($agg_config, 'stringify_context', 'stringify_context_config', sub { map { ref($_) ? Dump($_) : $_ } @_ });
786             $finalize_result_func = $compile_user_code->($agg_config, 'finalize_result', 'finalize_result_config', sub {});
787             $passthrough_func = $compile_user_code->($agg_config, 'passthrough', 'passthrough_config', sub { return () });
788             $user_new_context_func = $compile_user_code->($agg_config, 'new_context', 'new_context_config', sub { return () });
789             $user_merge_func = $compile_user_code->($agg_config, 'merge', 'merge_config', sub { return () });
790             $user_reduce_func = $compile_user_code->($agg_config, 'reduce', 'reduce_config', sub { return () });
791              
792             if ($agg_config->{crossproduct} && $agg_config->{combinations}) {
793             for my $crosskey (uniq(keys(%{$agg_config->{crossproduct}}), keys(%{$agg_config->{combinations}}))) {
794             $combination_funcs{$crosskey} = $compile_user_code->($agg_config->{combinations}, $crosskey, "combine on $crosskey", sub { 0 });
795             }
796             }
797              
798             END_FIELDS
799 5         122 $code .= "\t\$itemref = \\$agg_config->{item_name};\n";
800 5         18 $code .= "}\n";
801              
802             #
803             # New context ($ps) allocator
804             #
805              
806 5         14 $s{new_ps} .= "\t\$ps = {};\n";
807 5         12 $s{new_ps} .= "\t\$ps->{item_counter} = 0;\n";
808 5 100       41 $s{new_ps} .= "\t\$ps->{heap} = {};\n"
809             if Dump($agg_config) =~ /\{heap\}/;
810 5 100       1060 if ($has_keepers) {
811 4         13 $s{new_ps} .= "\t\$ps->{random} = [];\n";
812 4         12 $s{new_ps} .= "\t\$ps->{sidestats} = {};\n"; # for Stream::Aggregate::Stats
813             }
814 5 100       21 $s{new_ps} .= "\t\$ps->{unfiltered_counter} = 0;\n" if $agg_config->{filter};
815 5 50       24 $s{new_ps} .= "\t&\$initialize_func;\n" if $s{initialize};
816 5 50       18 $s{new_ps} .= "\t&\$user_new_context_func;\n" if $agg_config->{new_context};
817 5         14 $s{new_ps} .= "\t\$ps->{row} = undef;\n";
818 5         8 $s{new_ps} .= "\tlock_keys(%\$ps);\n";
819              
820             #
821             # main processing loop, generated for execution efficiency
822             #
823              
824 5 50       17 $s{process} .= "\t\$last_item = \$\$itemref;\n"
825             if Dump($agg_config) =~ /\$last_item\b/;
826 5         701 $s{process} .= $eval_line_numbers->(<<'END_P0');
827             $last_item = $$itemref;
828             ($$itemref) = @_;
829             my @ret;
830             unless ($$itemref) {
831             $finish_cross_func->(\@ret) if keys %$cross_data;
832             $finish_context_func->(\@ret)
833             while @contexts;
834             return @ret;
835             }
836             END_P0
837 5 100       93 $s{process} .= $eval_line_numbers->(<<'END_P1') if $agg_config->{preprocess};
838              
839             &$preprocess_func;
840              
841             END_P1
842 5 100 66     48 $s{process} .= $eval_line_numbers->(<<'END_P2') if $agg_config->{filter} && $agg_config->{filter_early};
843              
844             $count_this = &$filter_func;
845             END_P2
846 5 50       44 $s{process} .= $eval_line_numbers->(<<'END_P3') if $agg_config->{passthrough};
847              
848             push(@ret, &$passthrough_func);
849              
850             END_P3
851              
852 5 100       20 if ($agg_config->{context}) {
853 2 50 33     11 $s{process} .= $eval_line_numbers->(<<'END_P4') if $agg_config->{filter} && $agg_config->{filter_early};
854              
855             if ($count_this) {
856              
857             END_P4
858 2         7 $s{process} .= $eval_line_numbers->(<<'END_P5');
859              
860             my @new_context = &$get_context_func;
861             my @new_strings = $stringify_func->(@new_context);
862              
863             my $diffpos = list_difference_position(@new_strings, @context_strings);
864              
865             if (defined $diffpos) {
866             $finish_context_func->(\@ret)
867             while @current_context >= $diffpos;
868             }
869              
870             while (@new_context > @current_context) {
871             $add_context_component_func->($new_context[@current_context], $new_strings[@current_context]);
872             }
873             END_P5
874              
875 2 50 33     37 $s{process} .= $eval_line_numbers->(<<'END_P7') if $agg_config->{filter} && $agg_config->{filter_early};
876             }
877              
878             END_P7
879             }
880              
881 5 50 66     32 $s{process} .= $eval_line_numbers->(<<'END_P7A') if $agg_config->{filter} && ! $agg_config->{filter_early};
882              
883             $count_this = &$filter_func;
884             END_P7A
885              
886 5 100 66     40 $s{process} .= $eval_line_numbers->(<<'END_P7B') if $agg_config->{filter} && ! $agg_config->{context};
887             if ($count_this) {
888             END_P7B
889            
890 5         42 $s{process} .= $eval_line_numbers->(<<'END_P8');
891             &$count_func;
892             $ps->{item_counter}++;
893             END_P8
894              
895             # this closes the if ($count_this) in P3 or in P7B
896 5 100       82 $s{process} .= $eval_line_numbers->(<<'END_P9') if $agg_config->{filter};
897             }
898             $ps->{unfiltered_counter}++;
899             END_P9
900              
901 5         38 $s{process} .= $eval_line_numbers->(<<'END_P10');
902             return @ret;
903             END_P10
904 5 50       81 $s{process} .= resume_line_numbering if $renumber;
905              
906             #
907             # Merge contexts func
908             #
909              
910 5 50       26 $s{merge0} .= "print STDERR YAML::Dump('MERGE', \$ps, \$oldps);\n" if $agg_config->{debug} > 11;
911 5 50       28 $s{merge0} .= resume_line_numbering if $renumber;
912 5         15 $s{merge0} .= "\t\$ps->{item_counter} += \$oldps->{item_counter};\n";
913 5 100       24 $s{merge0} .= "\t\$ps->{unfiltered_counter} += \$oldps->{unfiltered_counter};\n" if $agg_config->{filter};
914 5 50       23 $s{merge3} .= resume_line_numbering if $renumber;
915 5         11 $s{merge3} .= "\t&\$user_merge_func;\n";
916              
917 5 50       21 $s{fv_setup} .= "print STDERR YAML::Dump('final_values', \$ps);\n" if $agg_config->{debug} > 12;
918              
919 5         32 $code .= $assemble_code->('', qw(user));
920 5         34 $code .= $assemble_code->('merge', qw(merge0 merge merge2 merge3));
921 5         20 $code .= $assemble_code->('cross_reduce', qw(cross_reduce));
922 5         46 $code .= $assemble_code->('finish_cross', qw(finish_cross));
923 5         16 $code .= $assemble_code->('new_ps', qw(new_ps));
924 5         18 $code .= $assemble_code->('process', qw(process));
925 5         18 $code .= $assemble_code->('initialize', qw(initialize));
926 5         19 $code .= $assemble_code->('final_values', qw(fv_setup output stat final_values));
927 5         92 $code .= $assemble_code->('count', qw(precount count ephemeral0 ephemeral ephemeral2 crossproduct ephemeral3 keep standard_deviation median dominant counter percentage sum mean median min minstr max maxstr count2 keeper1 keeper2 keeper3 ));
928 5         19 $code .= $assemble_code->('reduce', qw(reduce reduce2));
929 5 50       27 die "INTERNAL ERROR: ".join(' ', keys %s) if keys %s;
930              
931 5 50       24 if ($suppress_line_numbers) {
932 0         0 $code =~ s/^#line \d+ ".*"\s*?\n//mg;
933             }
934              
935 5 50       20 print STDERR $line_numbers{$code}."\n" if $agg_config->{debug};
936              
937 5     5   36 eval $code;
  5     5   15  
  5         183  
  5         25  
  5         8  
  5         13586  
  5         614  
938 5 50       6951 die "$@\n$line_numbers{$code}" if $@;
939              
940 5         342 };
941              
942 5         19 &$compile_config;
943              
944             $add_context_component_func = sub {
945 9     9   15 my ($component, $component_string) = @_;
946              
947 9         23 &$new_ps_func;
948              
949             # keep @contexts and @current_context together
950 9         784 push(@current_context, $component);
951 9         25 push(@context_strings, $component_string);
952 9         14 push(@contexts, $ps);
953              
954 9         15 $items_seen[$#contexts] += 1;
955 9         20 $#items_seen = $#contexts;
956 9         30 push(@items_seen, 0);
957 5         33 };
958              
959             $finish_context_func = sub {
960 9     9   15 my ($retref) = @_;
961              
962 9 50       21 die unless @contexts;
963              
964 9 50       29 print STDERR "about to call finish cross\n" if $agg_config->{debug} > 5;
965 9         23 $finish_cross_func->($retref);
966              
967 9 50       30 die unless @contexts;
968              
969 9 50       27 confess unless ref $ps;
970              
971 9         12 $suppress_result = 0;
972 9         21 $row = {
973             &$context_columns_func,
974             };
975 9         362 &$final_values_func;
976              
977             # keep @contexts and @current_context together
978 9         176 $oldps = pop(@contexts);
979 9         57 pop(@current_context);
980 9         14 pop(@context_strings);
981              
982 9         19 $ps = $contexts[-1];
983              
984 9 100       156 &$merge_func if $ps;
985              
986 9 50       43 push (@$retref, $row) unless $suppress_result;
987 5         29 };
988              
989 5         55 return $process_func;
990              
991             }
992              
993             sub validate_aggregation_config
994             {
995 5     5 0 13 my ($agg_config) = @_;
996 5         38 my $checker = eval config_checker_source;
997 5 50       13666 die $@ if $@;
998 5         28 $checker->($agg_config, $prototype_config, '- Stream::Aggregate config');
999             }
1000              
1001             1;
1002