File Coverage

blib/lib/StoredHash/Bulk.pm
Criterion Covered Total %
statement 12 162 7.4
branch 0 46 0.0
condition 0 18 0.0
subroutine 4 16 25.0
pod 0 7 0.0
total 16 249 6.4


line stmt bran cond sub pod time code
1             # Bulk insert / update
2             # INITIALLY: On Update Assume NO ID changes
3             # (Allow passing via update() call parameters '' => '' - too late ?
4             # Scenario: rename of children when par. id changes (composite key)
5             # TODO: Support AoA (not only AoH) as input ?
6             package StoredHash::Bulk;
7              
8              
9 1     1   3126 use strict;
  1         1  
  1         35  
10 1     1   4 use warnings;
  1         1  
  1         29  
11 1     1   4 use Scalar::Util ('reftype');
  1         8  
  1         57  
12 1     1   4 use Data::Dumper;
  1         1  
  1         1336  
13             # Inherit ???
14             #NOT: our @ISA = ('StoredHash');
15             our $VERSION = '0.30';
16             # Factory method to return bulk-op (insert / update) ready object.
17             # Keyword params:
18             # - dbh - Data Connection (Default: use connection in $shp)
19             # TODO: Allow attribute mapping ?
20             # TODO: Pass 2 persisters ?
21             sub StoredHash::Bulk {
22 0     0 0   my ($p, %c) = @_;
23 0           my $bulk = {'shp' => $p, 'idcache' => undef };
24 0 0         if (ref($_[1]) eq 'StoredHash') {
25             # Assume destination SHP
26             }
27             # Allow overrides in destination
28 0   0       my $dbh = $c{'dbh'} || $p->{'dbh'};
29 0   0       my $table = $c{'table'} || $p->{'table'};
30             # Explicit attrs allow partial updates
31 0           my $attrs = $c{'attrs'};
32             #
33 0 0         if (ref($attrs) ne 'ARRAY') {$attrs = $p->cols();}
  0            
34 0           my @pks = $p->pkeys();
35 0           my @ats;
36 0 0         if (ref($attrs) ne 'ARRAY') {die("No attributes for replication !!!");}
  0            
37             # Clone (or contain HAS-A style) and add a few members ????
38 0           $bulk->{'attrs'} = $attrs;
39             # Generate INSERT/UPDATE and Prepare for bulk-op
40             # Plain SQL (attrs sorted or not ?)
41 0           local $p->{'dbh'} = undef;
42 0           local $p->{'table'} = $table; # Table may be different
43            
44             # TODO: Avoid this by allowing passing explicit attr to insert / update
45             # TODO: Eliminate workaround by sort()
46             # TODO: Any exceptions to attrs ???
47             # TODO: Pass whole %c to insert / update here (or only 'attrs','table')
48 0           my %dummy = map({$_ => 1;} @$attrs);
  0            
49 0           @ats = sort(@$attrs); # Workaround
50             ###### Insert:
51             # Need braches to describe attribute params for particular OP (ins/upd) ?
52             # Ensure that 'attrs' drives the order OR that order is stored.
53 0           $bulk->{'ins'} = $p->insert(\%dummy, 'attrs' => \@ats);
54 0           $bulk->{'insattrs'} = [@ats];
55            
56             ###### Update: ALWAYS Strip ID fields
57             #sub attrs_noids {
58 0           my %ids = map({$_ => 1;} @pks);
  0            
59 0           @ats = grep({!$ids{$_};} @$attrs);
  0            
60             #}
61 0 0         if ($c{'debug'}) {print("Attrs for up: ".Dumper(\@ats));}
  0            
62             # Generate SQL
63 0           $bulk->{'upd'} = $p->update(\%dummy, [@pks], 'attrs' => \@ats);
64             # Rely on attributes in order
65 0           $bulk->{'updattrs'} = [@ats];
66            
67             ##### Prepare Exists sth
68 0           my $whereid = StoredHash::whereid($p);
69 0           my $qe = "SELECT COUNT(*) FROM $table WHERE $whereid";
70             #my $sth = $dbh->prepare($qe); # Delay prep ?
71 0           $bulk->{'exs'} = $qe;
72 0           $bulk->{'exsattrs'} = [@pks];
73             ############### DEBUG #########
74 0 0         if ($c{'debug'}) {print(Dumper($bulk));}
  0            
75             # Supported as plain query w/o $dbh ?
76             #$bulk->{'exi'} = $p->exists(undef, 'attrs' => $attrs);
77            
78            
79            
80 0           bless($bulk, 'StoredHash::Bulk'); # __PACKAGE__
81             # TOO EARLY (need tgt conn)
82             #NOT:if ($c{'cacheid'}) {$bulk->makeidcache();}
83 0           return($bulk);
84             }
85             #
86             #sub new {
87             # my ($class, %opt) = @_;
88             # my $shp = StoredHash->new(%opt);
89             # bless($shp, $class);
90             #}
91              
92             # Create a cache of target system IDs to avoid probing presence of each entry
93             # individually during replication.
94             # TODO: Rely on StoredHash to load IDs
95             sub makeidcache {
96 0     0 0   my ($bulk, $dbh, $table, $id) = @_;
97 0           my $idcnt = scalar(@$id);
98             # Simple ID Maker
99 0     0     my $idmk = sub {($_[0]->[0], 1);};
  0            
100 0           my $q = "SELECT ".join(',', @$id)." FROM $table";
101 0           my $aoa = $dbh->selectall_arrayref($q);
102 0           my %idc = map({
103             # Create ':' -delimited composite keys
104 0           my $ck = join(":", splice(@$_, 0, 2));
105 0           ($ck, 1);
106            
107             } @$aoa);
108             # ID:s for target system
109 0           $bulk->{'idcache'} = \%idc;
110             }
111              
112             # TODO: Keep insert/update as wrappers, label actual op to keyword
113             # parameter "stack" to be passed downstream
114              
115             # Internal sub to implement EITHER insert or update (with very similar pattern)
116             # TODO: Support AoA ?
117             # - check for exist (prepared earlier at bulk constructor)
118             sub ins_or_upd {
119 0     0 0   my ($bulk, $arr, %c) = @_;
120 0   0       my $dbh = $c{'dbh'} || $bulk->{'shp'}->{'dbh'};
121 0           my $ok = 0;
122 0           my $debug = $c{'debug'};
123 0   0       my $p = $bulk->{'shp'} || die("No StoredHash for Bulk-op");
124 0   0       my $idc = $bulk->{'idcache'} || 0;
125 0           local $Data::Dumper::Terse = 1;
126             # Need more granular / custom attrs than bulk universal attrs ?
127 0           my (@attrs, @attrsupd, @pks);
128 0           my ($sth, $sthi, $sthu, $sthe);
129 0           my ($inscnt, $updcnt) = (0,0);
130             # Exec Ops for hash
131             my $ops = {
132             'ins' => sub {
133 0     0     my ($e) = @_;
134 0           $sth->execute(@$e{@attrs});
135             },
136             'upd' => sub {
137 0     0     my ($e) = @_;
138 0           $sth->execute(@$e{@attrs},); # @$e{@pks},
139             },
140             'insorup' => sub {
141 0     0     my ($e) = @_;
142 0           my $ok = 0;
143             # Exists => Update
144 0           my @pkv = @$e{@pks};
145 0           my $is = 0;
146 0 0         if ($idc) {
  0 0          
147 0 0         if ($idc->{join(':', @pkv)}) {$is = 1;}
  0            
148             }
149             elsif (_exists($sthe, \@pkv)) {$is = 1;}
150             ############################
151 0 0         if ($is) {
  0            
152             # ID attrs already included at end
153 0           $ok = $sthu->execute(@$e{@attrsupd},);$updcnt++;
  0            
154             }
155             # insert as new
156 0           else {$ok = $sthi->execute(@$e{@attrs},);$inscnt++;}
157 0           return($ok);
158             },
159 0           };
160 0   0       my $op = $c{'op'} || 'insorup';
161 0 0         if (!$ops->{$op}) {die("$op - No such operartion supported");}
  0            
162             # Allow blessed too !
163 0 0 0       if (!$arr || (reftype($arr) ne 'ARRAY')) {die("No Bulk Array to operate on !");}
  0            
164 0           @pks = $p->pkeys();
165            
166 0           DEBUG:my $qe = $bulk->{'exs'};
167             # This does not show lack of knowledge about datastructures, we merely
168             # want to extract max perf from lexicals statement handles
169 0 0         if ($op eq 'insorup') {
170 0           $sthe = $bulk->prepare($dbh, 'op' => 'exs', 'debug' => 1);
171 0           $sthi = $bulk->prepare($dbh, 'op' => 'ins', 'debug' => 1);
172 0           $sthu = $bulk->prepare($dbh, 'op' => 'upd', 'debug' => 1);
173             # Setup separate: @attrs /
174 0           @attrs = @{$bulk->{'insattrs'}};
  0            
175 0           @attrsupd = @{$bulk->{'updattrs'}};
  0            
176             # Add ID (In order)
177 0           push(@attrsupd, @pks);
178 0           local $Data::Dumper::Indent=0;
179 0           print("Final INS: ".Dumper(\@attrs)."\n\n");
180 0           print("Final UPD: ".Dumper(\@attrsupd)."\n\n");
181             }
182             # Need only single handle (in the callbacks)
183             else {
184 0           $sth = $bulk->prepare($dbh, 'op' => $op);
185 0           @attrs = @{$bulk->{$op.'attrs'}};
  0            
186             }
187             # Pick callback to execute
188 0           my $opcb = $ops->{$op};
189             # In case of update append the earlier stripped ID (for where clause)
190 0 0         if ($op eq 'upd') {push(@attrs, @pks);}
  0            
191 0           my $i = 0;
192 0           local $Data::Dumper::Terse = 1;local $Data::Dumper::Indent = 0;
  0            
193             #my $qiu = $bulk->{$op}; # These's no sigle op
194 0           for my $e (@$arr) {
195 0 0         if (reftype($e) ne 'HASH') {die("Not a hash to insert");}
  0            
196 0 0         if ($c{'dry'}) {
197 0           my @v = @$e{@attrs};
198 0           my @vu = @$e{@attrsupd};
199 0           print("QUERY:\n");
200             # OP:$bulk->{$op}
201 0           print("(\@v(ins)=)".Dumper(\@v)."\n");
202 0           print("(\@vu(upd)=)".Dumper(\@vu)."\n");
203             }
204             else {
205 0           my $ok = $opcb->($e);
206             # Could re-enable !!!!$DBI::errstr
207             #OLD:my $ok = $sth->execute(@$e{@attrs});
208 0 0         if (!$ok) {die("Failed to ins/upd: \n");} # $qiu
  0            
209             #print("NOP\n");
210             }
211 0 0         if ($debug) {print("Proc: $i\n");}
  0            
212 0           $i++;
213             }
214 0 0         if ($debug) {print("tot: $i, ins=$inscnt, upd=$updcnt\n");}
  0            
215 0           return($i);
216             }
217              
218             sub insert {
219 0     0 0   my ($bulk, $arr, %c) = @_;
220 0           $c{'op'} = 'ins';
221 0           $bulk->ins_or_upd($arr, %c);
222             }
223              
224             # Try to keep as a stub containing call to shared implementation
225             sub update {
226 0     0 0   my ($bulk, $arr, %c) = @_;
227 0           $c{'op'} = 'upd';
228 0           $bulk->ins_or_upd($arr, %c);
229             }
230              
231             # Insert or update
232             sub store {
233 0     0 0   my ($bulk, $arr, %c) = @_;
234             #my $dbh = $c{'dbh'} || $bulk->{'shp'}->{'dbh'};
235             #my $ok = 0;
236 0           $c{'op'} = 'insorup';
237             # OPTION 1
238 0           $bulk->ins_or_upd($arr, %c);
239             #my ($sth, $sthi, $sthu, $sthe);
240             }
241              
242             # Internal method to Prepare any of the ops bulk-op queries
243             sub prepare {
244 0     0 0   my ($bulk, $dbh, %c) = @_;
245 0           my $op = $c{'op'};
246 0           my $qi = $bulk->{$op};
247 0 0         if (!$qi) {die("No query for bulk-op");}
  0            
248 0           my $sth = $dbh->prepare($qi);
249 0 0         if (!$sth) {die("No statement for $op : ".$dbh->errstr()."");}
  0            
250 0 0         if ($c{'debug'}) {print("PREP($op): $qi\n\n");}
  0            
251 0           return($sth);
252             }
253              
254             # Keep not stepping on perl built-in
255             sub _exists {
256 0     0     my ($sthe, $idvals) = @_;
257 0           my $ok = $sthe->execute(@$idvals);
258 0 0         if (!$ok) {die("Failed to execute exist query by @$idvals");}
  0            
259 0           my $es = $sthe->fetchall_arrayref();
260             # Check multiple (likely a false ID field)
261 0           my $cnt = $es->[0]->[0];
262 0 0         if ($cnt > 1) {die("More than one entry for unique ID !");}
  0            
263 0           return($cnt);
264            
265             }
266             1;