File Coverage

blib/lib/DBIx/BulkUtil.pm
Criterion Covered Total %
statement 45 1595 2.8
branch 0 922 0.0
condition 0 440 0.0
subroutine 15 119 12.6
pod 8 12 66.6
total 68 3088 2.2


line stmt bran cond sub pod time code
1             package DBIx::BulkUtil;
2              
3 1     1   14136 use DBI;
  1         11499  
  1         40  
4 1     1   4 use Carp qw(confess);
  1         1  
  1         40  
5              
6 1     1   3 use strict;
  1         4  
  1         13  
7 1     1   2 use warnings;
  1         1  
  1         1037  
8              
9             our $VERSION = '0.05';
10              
11             # Override this
12             sub passwd {
13 0     0 1   return '';
14             }
15              
16             # Override this
17             sub user {
18 0     0 0   return '';
19             }
20              
21             {
22             my @connect_options = qw(
23             Server
24             Database
25             Env
26             Type
27             User
28             Password
29             DataDir
30             ConnectMethod
31             RetryCount
32             RetryMinutes
33             BulkLogin
34             NoBlankNull
35             Silent
36             NoCharset
37             NoServer
38             Dsl
39             DslOptions
40             DateFormat
41             DatetimeFormat
42             DatetimeTzFormat
43             );
44             my %is_valid;
45             $is_valid{$_}++ for @connect_options;
46              
47             sub _options_valid {
48 0     0     my $class = shift;
49 0           my %opts = @_;
50 0           for my $opt (keys %opts) {
51 0 0         return $opt if !$is_valid{$opt};
52             }
53 0           return;
54             }
55             }
56              
57             # Override this to set server, db, env, type based on whatever
58             sub env2db {
59 0     0 0   my ($self,$args) = @_;
60              
61 0 0 0       $args->{Type} ||= (!$args->{Server} && $args->{Database} ) ? 'Oracle' : 'Sybase';
      0        
62 0 0         if ( $args->{Type} eq 'SybaseIQ' ) {
63 0           $args->{IsIQ}++;
64 0           $args->{Type} = 'Sybase'
65             }
66             }
67              
68             sub connect {
69 0     0 1   my $class = shift;
70              
71             # Use HandleError sub instead of more straightforward RaiseError
72             # attribute because Sybase 1.09 does not include line numbers in its
73             # RaiseError die messages. And a stacktrace is usually more helpful
74             # anyway.
75 0           my $dbi_opts = {
76             ChopBlanks => 1,
77             AutoCommit => 1,
78             PrintError => 0,
79             RaiseError => 1,
80             LongReadLen => 1_024 * 1_024,
81             };
82              
83 0 0 0       if ( @_ and ref($_[-1]) ) {
84 0           my $tmp_opts = pop @_;
85 0           @$dbi_opts{keys %$tmp_opts} = values %$tmp_opts;
86             }
87 0           my $bad_opt = $class->_options_valid(@_);
88 0 0         die "Invalid option $bad_opt to ${class}->connect" if $bad_opt;
89              
90             # TODO: Log or Output option?
91 0           my %args = @_;
92 0           my $fh;
93 0 0         open($fh, ">", "/dev/null") if $args{Silent};
94 0 0         my $stdout = $args{Silent} ? $fh : \*STDOUT;
95 0           local *STDOUT = $stdout;
96              
97 0   0       my $connect = $args{ConnectMethod} || 'connect';
98              
99 0           $class->env2db( \%args );
100              
101             my @dsl_args = $args{Dsl}
102 0           ? ref($args{Dsl}) ? @{$args{Dsl}} : $args{Dsl}
103 0 0         : ();
    0          
104              
105 0           my $type = $args{Type};
106 0           my $database = $args{Database};
107 0   0       my $server = $args{Server} || '';
108              
109 0 0         if (!@dsl_args) {
110 0 0         if ( $type eq 'Sybase' ) {
    0          
111             # Server-side charset is iso, need to specify it as client-side charset
112             # or else we get utf8 to iso charset conversion error when database handle is cloned
113             # (which happens automatically when you need multiple active statement handles).
114 0 0         push @dsl_args, "server=$server" unless $args{NoServer};
115 0 0         push @dsl_args, 'charset=iso_1' unless $args{NoCharset};
116 0 0         push @dsl_args, 'bulkLogin=1' if $args{BulkLogin};
117             } elsif ( $type eq 'Oracle' ) {
118 0 0         push @dsl_args, $database unless $args{NoServer};
119             } else {
120 0           die "Unable to connect to database type $type";
121             }
122             }
123              
124             # For Xtra Dsl options
125             push @dsl_args, ref($args{DslOptions})
126 0           ? @{$args{DslOptions}} : $args{DslOptions}
127 0 0         if $args{DslOptions};
    0          
128              
129 0           my $dsl = "dbi:$type:";
130 0           $dsl .= join( ";", @dsl_args );
131              
132 0   0       my $user = $args{User} || $class->user(\%args);
133 0   0       my $passwd = $args{Password} || $class->passwd(\%args);
134              
135 0           my $dbh;
136 0   0       my $retry = int($args{RetryCount} || 0);
137              
138 0   0       my $retry_seconds = 60 * ($args{RetryMinutes} || 10);
139 0 0         $retry_seconds = 60 * 10 if $retry_seconds < 0;
140              
141 0 0         my $conn_name = ($type eq 'Sybase') ? $server : $database;
142 0           while (1) {
143              
144 0           print "Connecting to $conn_name\n";
145 0           $dbh = eval { DBI->$connect($dsl, $user, $passwd, $dbi_opts) };
  0            
146 0           my $err = $@;
147 0 0         unless ($dbh) {
148 0 0         die $err unless $retry-- > 0;
149 0           print "Unable to connect to $conn_name. Will retry in $retry_seconds seconds";
150 0           sleep $retry_seconds;
151 0           redo;
152             }
153              
154             # Make selected Sybase database the current database
155             # And make date formats consistent
156 0 0         if ( $type eq 'Sybase' ) {
    0          
157             # Switch database after connect so that we get a helpful error message
158             # and an error instead of a warning
159 0 0         if ($database) {
160 0           print "Using $database database\n";
161 0           my $result = eval { $dbh->do("USE $database") };
  0            
162 0           my $err = $@;
163 0 0         unless ($result) {
164 0 0         die $err unless $retry-- > 0;
165 0           $dbh->disconnect();
166 0           print "Unable to use database $database on $server. Will retry in $retry_seconds seconds\n";
167 0           sleep $retry_seconds;
168 0           redo;
169             }
170             };
171 0   0       $dbh->{syb_date_fmt} = $args{DateFormat} || 'ISO';
172              
173 0 0 0       $dbh->do("set temporary option Load_ZeroLength_AsNULL = 'ON'") if $args{IsIQ} and !$args{NoBlankNull};
174             } elsif ( $type eq 'Oracle' ) {
175             # Fractions on Oracle "DATE" format not allowed
176 0   0       my $date_fmt = $args{DateFormat} || 'YYYY-MM-DD HH24:MI:SS';
177 0   0       my $datetime_fmt = $args{DatetimeFormat} || 'YYYY-MM-DD HH24:MI:SS.FF';
178 0   0       my $datetime_tz_fmt = $args{DatetimeTzFormat} || $args{DatetimeFormat} || $datetime_fmt;
179 0           $_ = $dbh->quote($_) for $date_fmt, $datetime_fmt, $datetime_tz_fmt;
180 0           $dbh->do("alter session set nls_date_format=$date_fmt");
181 0           $dbh->do("alter session set nls_timestamp_format=$datetime_fmt");
182 0           $dbh->do("alter session set nls_timestamp_tz_format=$datetime_tz_fmt");
183             }
184 0           last;
185             }
186              
187             # We do not want stack trace on connect so that we do not expose password
188             # But everywhere else it is useful
189 0           $dbh->{RaiseError} = 0;
190 0     0     $dbh->{HandleError} = sub { confess $_[0] };
  0            
191 0 0         return $dbh unless wantarray;
192 0           my $util = DBIx::BulkUtil::Obj->new($dbh, $passwd, \%args);
193 0           return $dbh, $util;
194             }
195              
196             # Just set the connect method and call connect()
197             sub connect_cached {
198 0     0 1   my $class = shift;
199 0           my @args = $class->override({ConnectMethod => 'connect_cached'}, @_);
200 0           $class->connect(@args);
201             }
202              
203             sub syb_connect {
204 0     0 1   my $class = shift;
205 0           my @args = $class->override({ Type => 'Sybase' }, @_);
206 0           return $class->connect(@args);
207             }
208              
209             sub syb_connect_cached {
210 0     0 0   my $class = shift;
211 0           my @args = $class->override({ Type => 'Sybase' }, @_);
212 0           return $class->connect_cached(@args);
213             }
214              
215             sub ora_connect {
216 0     0 1   my $class = shift;
217 0           my @args = $class->override({ Type => 'Oracle' }, @_);
218 0           return $class->connect(@args);
219             }
220              
221             sub ora_connect_cached {
222 0     0 1   my $class = shift;
223 0           my @args = $class->override({ Type => 'Oracle' }, @_);
224 0           return $class->connect_cached(@args);
225             }
226              
227             sub iq_connect {
228 0     0 1   my $class = shift;
229 0           my @args = $class->override({ Type => 'SybaseIQ' }, @_);
230 0           return $class->connect(@args);
231             }
232              
233             sub iq_connect_cached {
234 0     0 1   my $class = shift;
235 0           my @args = $class->override({ Type => 'SybaseIQ' }, @_);
236 0           return $class->connect_cached(@args);
237             }
238              
239              
240             # Overriden connect args need to be spliced in before any dbi options
241             sub override {
242 0     0 0   my $self = shift;
243 0           my ($ovr, @args) = @_;
244 0 0         if ( (@args % 2) == 0 ) {
245 0           return @args, %$ovr;
246             }
247 0           my $dbi_opts = pop @args;
248 0 0 0       die "Last argument to connect must be hash reference" unless $dbi_opts and ref($dbi_opts);
249 0           return @args, %$ovr, $dbi_opts;
250             }
251              
252             package DBIx::BulkUtil::Obj;
253              
254             our $BCP_DELIMITER = '|';
255              
256 1     1   554 use Memoize qw(memoize);
  1         1562  
  1         45  
257 1     1   4 use Carp qw(confess);
  1         1  
  1         641  
258              
259             sub new {
260 0     0     my ( $class, $dbh, $passwd, $args ) = @_;
261 0           my $type = $dbh->{Driver}{Name};
262 0 0         if ($type eq 'Sybase') {
263 0           (my $version = $dbh->{syb_server_version_string}) =~ s|/.*||;
264 0 0         $type = 'SybaseIQ' if $version =~ /IQ/;
265             }
266 0 0         $class =~ s/::Obj$// or die "Invalid class $class";
267              
268 0           $class .= "::" . $type;
269 0           return $class->util($dbh, $passwd, $args);
270             }
271              
272             sub util {
273 0     0     my $class = shift;
274 0           my ($dbh, $pw, $args) = @_;
275 0 0         confess "Must use subclass of this package" if __PACKAGE__ eq $class;
276 0           my %util_args;
277              
278 0 0 0       if ( $args and ref($args) ) {
279 0 0         $util_args{NoBlankNull} = 1 if $args->{NoBlankNull};
280             }
281              
282             # Prevent dbh from disconnecting after fork in child processes
283 0           my $dbh_pid = $$;
284 0 0   0     my $release = DBIx::BulkUtil::Release->new(sub { $dbh->{InactiveDestroy} = 1 if $dbh_pid != $$ });
  0            
285 0           bless { DBH => $dbh, PASSWORD => $pw, DELIMITER => $BCP_DELIMITER, RELEASE => $release, %util_args }, $class;
286             }
287              
288 0     0     sub dbh { $_[0]->{DBH} }
289              
290             sub get {
291 0     0     my $self = shift;
292 0           my $select = shift;
293 0           my $dbh = $self->{DBH};
294 0           my @result = $dbh->selectrow_array( $self->row_select($select) );
295 0 0         return $result[0] if @result == 1;
296 0           return @result;
297             }
298              
299             sub exec_sp {
300 0     0     my $self = shift;
301 0           my $dbh = $self->{DBH};
302 0           $dbh->do($self->sp_sql(@_));
303             }
304              
305             sub bcp_out {
306 0     0     my $self = shift;
307 0           my $opts = {};
308 0 0         if (ref $_[-1]) {
309 0           $opts = pop @_;
310             }
311 0           my ( $table, $file ) = @_;
312 0   0       $file ||= "$table.bcp";
313              
314 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
315 0   0       my $row_delim = $opts->{RowDelimiter} || $/;
316 0 0         my @esc = ( escape_char => $opts->{EscapeChar} ) if $opts->{EscapeChar};
317              
318             # Default to no quote char to be more compatible w/Sybase
319 0 0         my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef, escape_char => undef );
320              
321             # TODO: Give up on Text::CSV ??
322 0           my $csv;
323 0 0         if ( length($delimiter) == 1 ) {
324 0           require Text::CSV_XS;
325 0           $csv = Text::CSV_XS->new({
326             binary => 1,
327             eol => $row_delim,
328             sep_char => $delimiter,
329             @esc,
330             @quote_char,
331             });
332             }
333              
334 0 0         my $col_list = $opts->{Columns} ? $opts->{Columns} : "*";
335              
336             # Only for HP?
337             #local $ENV{NLS_LANG} = "AMERICAN_AMERICA.WE8ROMAN8";
338              
339 0   0       my $enc_opt = $opts->{Encoding} || '';
340 0           my $db_type = $self->type();
341 0 0         if ( $db_type eq 'Oracle' ) {
342              
343 0 0         my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';
344 0   0       my $nls_lang = $ENV{NLS_LANG} || '';
345 0 0         if ( $nls_lang =~ /utf8/i ) {
346 0   0       $enc_opt ||= 'utf8';
347             }
348 0 0         if ( $col_list eq '*' ) {
349 0           my @col_list;
350 0           my $col_info = $self->column_info($table);
351 0           my $list = $col_info->{LIST};
352 0           my $col_map = $col_info->{MAP};
353 0           my $xml_cnt;
354 0           for my $col (@$list) {
355 0 0         if ( $col_map->{$col}{TYPE_NAME} eq 'XMLTYPE' ) {
356 0           $xml_cnt++;
357 0           push @col_list, "XMLType.getclobval($col)";
358 0           next;
359             }
360 0           push @col_list, $col;
361             }
362 0 0         if ($xml_cnt) {
363 0           $col_list = join(",", @col_list);
364             }
365             }
366 0 0         $table = "$table PARTITION ($partition)" if $partition;
367             }
368 0 0         my $enc = $enc_opt ? ":encoding($enc_opt)" : '';
369              
370 0 0         open(my $fh, ">$enc", $file) or confess "Can not write to $file: $!";
371 0           my $sql = "SELECT $col_list FROM $table\n";
372 0 0         $sql .= $opts->{Filter} if $opts->{Filter};
373              
374 0           my $dbh = $self->{DBH};
375 0           my $sth = $dbh->prepare($sql);
376 0 0         $sth->{ChopBlanks} = 0 unless $opts->{TrimBlanks};
377 0           $sth->execute();
378 0 0         if ($opts->{Header}) {
379 0 0         if ($csv) {
380 0           $csv->print($fh, $sth->{NAME_lc});
381             } else {
382 0           print $fh join( $delimiter, @{$sth->{NAME_lc}} ), $row_delim;
  0            
383             }
384             }
385              
386 0           my $cnt = 0;
387 0           while ( my $row = $sth->fetchrow_arrayref() ) {
388 1     1   4 no warnings 'uninitialized';
  1         1  
  1         115  
389 0 0         if ($csv) {
390 0           $csv->print($fh, $row);
391             } else {
392 0           print $fh join($delimiter, @$row), $row_delim;
393             }
394 0           $cnt++;
395             }
396 0           close $fh;
397 0           return $cnt;
398             }
399              
400             {
401 1     1   4 no warnings 'once';
  1         1  
  1         2809  
402             *select2file = \&bcp_out;
403             }
404              
405             sub bcp_file {
406 0     0     my ($self, $file_in, $file_out) = @_;
407 0           my $opts = {};
408 0 0         if (ref $_[-1]) {
409 0           $opts = pop @_;
410             }
411              
412 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
413 0   0       my $esc = $opts->{EscapeChar} || "\\";
414              
415 0 0         my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef );
416 0           require Text::CSV_XS;
417 0           my $csv = Text::CSV_XS->new({
418             binary => 1,
419             eol => $/,
420             sep_char => $delimiter,
421             escape_char => $esc,
422             @quote_char,
423             });
424              
425 0 0         open(my $in_fh, "<", $file_in) or die "Err: $!";
426 0 0         open(my $out_fh, ">", $file_out) or die "Err: $!";
427 0           my $hdr = $csv->getline($in_fh);
428 0           $csv->column_names($hdr);
429 0 0         my @drop_cols = $opts->{DropCols} ? @{$opts->{DropCols}} : ();
  0            
430 0           my %drop; $drop{$_}++ for @drop_cols;
  0            
431              
432             my @cols =
433 0           $opts->{KeepCols} ? @{$opts->{KeepCols}}
434 0 0         : @drop_cols ? grep !$drop{$_}, @$hdr
    0          
435             : @$hdr;
436 0           my %hdr_idx; @hdr_idx{@$hdr} = 0..$#$hdr;
  0            
437              
438 0 0         $csv->print($out_fh, [@$hdr[@hdr_idx{@cols}]]) if $opts->{Header};
439 0           while ( my $row = $csv->getline_hr($in_fh) ) {
440 0           $csv->print($out_fh, [@$row{@cols}]);
441             }
442 0           close $in_fh;
443 0           close $out_fh;
444             }
445              
446             sub add_header {
447 0     0     my ($self, $table, $file, $opts) = @_;
448 0   0       $opts ||= {};
449              
450 0           my $cols;
451 0 0 0       if ( $opts->{Header} || $opts->{Columns} ) {
452             my $sel_str =
453             $opts->{Columns}
454             ? ref($opts->{Columns})
455 0           ? join(",", @{$opts->{Columns}})
456             : $opts->{Columns}
457 0 0         : '*';
    0          
458 0           my $sth = $self->{DBH}->prepare("SELECT $sel_str FROM $table WHERE 1=0");
459 0           $sth->execute();
460 0           $cols = $sth->{NAME_lc};
461 0           $sth->finish();
462             }
463              
464 0 0         return $self->add_quotes($table, $file, $cols, $opts) if $opts->{QuoteFields};
465              
466             # If quotes are not required, this is more efficient
467             # I doubt anyone uses either option anyway
468             # but highly doubt anyone uses the quoting
469 0           require File::Copy;
470 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
471 0   0       local $/ = $opts->{RowDelimiter} || "\n";
472              
473 0 0         open(my $fh, ">", "$file.bak") or die "Failed to open $file.bak: $!";
474              
475             # Unbuffer the filehandle for printing header
476             # because File::Copy uses unbuffered syswrite
477             # $fh->flush() after the print would also work depending on
478             # version of perl and whether IO::Handle is loaded
479 0           for ( select $fh ) { $| = 1; select $_ }
  0            
  0            
480 0           print $fh join($d, @$cols), $/;
481              
482 0 0         File::Copy::copy($file, $fh) or die "Failed to copy $file to $file.bak: $!";
483 0           close $fh;
484              
485 0           return "$file.bak";
486             }
487              
488             sub add_quotes {
489 0     0     my ($self, $table, $file, $cols, $opts) = @_;
490 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
491 0           my $dre = quotemeta($d);
492              
493 0           local ($_, $., $ARGV, *ARGV);
494 0           local ( $^I, @ARGV ) = ( '.bak', $file );
495 0   0       local $/ = $opts->{RowDelimiter} || "\n";
496 0           my $done;
497 0           while ( <> ) {
498 0 0 0       print join($d, @$cols), $/ if !$done++ && $opts->{Header};
499              
500 0 0         if ($opts->{QuoteFields}) {
501 0           chomp;
502 0           my @fields = split /$dre/;
503 0   0       /\s/ and $_ = qq("$_") for @fields;
504 0           $_ = join($d, @fields) . $/;
505             }
506 0           print;
507             }
508 0           return "$file.bak";
509             }
510              
511             sub type {
512 0     0     my $self = shift;
513 0           return $self->{DBH}{Driver}{Name};
514             }
515              
516             # Because of Sybase and its stupid mixed case column names,
517             # we need to be able to find the actual cased name for a given
518             # uncased column name.
519             # Just pray that there are not two columns with the same name
520             # in the same table that are differently cased.
521             memoize('column_info');
522              
523             sub column_info {
524             my $self = shift;
525             my $table = shift;
526              
527             my $schema;
528             my $dbtype = $self->type();
529             my ($tmp_db, $curr_db) = (undef,'');
530             my $dbh = $self->{DBH};
531             my %col_dflt;
532             if ( $dbtype eq 'Oracle' ) {
533             $table = uc($table);
534             if ( $table =~ /^(\w+)\.(\w+)$/ ) {
535             ($schema, $table) = ($1,$2);
536             } else { $schema = $self->curr_schema() }
537             } elsif ( $dbtype eq 'Sybase' ) {
538              
539             $tmp_db = $curr_db = $self->curr_db();
540              
541             if ( $table =~ /^#/ ) {
542             $table = $self->temp_table_name($table);
543             }
544              
545             if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(#?\w+)$/ ) {
546             ($tmp_db, $schema, $table) = ($1,$2,$3);
547             $schema ||= undef;
548              
549             # We can only get column info on the current database
550             $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db;
551             }
552              
553             $schema ||= '%';
554              
555             # Sybase gets metadata through a (under the DBD hood) stored proc, but does not return defaults.
556             # So get defaults here.
557             my $sth = $dbh->prepare( sprintf( $self->default_sql(), $table ) );
558             $sth->execute();
559             $sth->bind_columns( \my ( $col_name, $default ) );
560             while ( $sth->fetch ) {
561             $col_dflt{$col_name} .= $default;
562             }
563             }
564              
565             my $sth = $self->{DBH}->column_info($tmp_db, $schema, $table, '%');
566             my @names = @{$sth->{NAME_uc}};
567             my %row; $sth->bind_columns(\@row{@names});
568             my @list;
569             my %col_map;
570             my $col_cnt = 0;
571             while ( $sth->fetch() ) {
572              
573             # Data is probably in order, but we are not guaranteed
574             # So assign by index instead of pushing to array if possible
575             # IQ does not have ORDINAL_POSITION so fall back to select order
576             my $idx = defined($row{ORDINAL_POSITION}) ? $row{ORDINAL_POSITION}-1 : $col_cnt;
577             $col_cnt++;
578              
579             my $name = lc($row{COLUMN_NAME});
580             $list[$idx] = $name;
581             ($row{COLUMN_DEF} = $col_dflt{$name}) =~ s/^default\s*//i if defined($col_dflt{$name}) and !defined($row{COLUMN_DEF});
582             $col_map{$name} = { %row };
583             }
584             $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
585              
586             return unless $col_cnt;
587             my %col_info = (
588             LIST => \@list,
589             MAP => \%col_map,
590             );
591             return \%col_info;
592             }
593              
594             sub last_chg_list {
595 0     0     my $self = shift;
596 0           my ($table, $columns) = @_;
597              
598             # Determine if last_chg_user, last_chg_date need to be updated
599 0           my %chg_field = (last_chg_user => 1, last_chg_date => 1);
600 0           delete $chg_field{$_} for map lc, @$columns;
601 0           my %chg_cols;
602 0 0         if (%chg_field) {
603             # Are chg columns in table
604 0           my $col_info = $self->column_info($table);
605 0           my $col_map = $col_info->{MAP};
606 0           for my $c (keys %chg_field) {
607 0 0         $chg_cols{$c} = $col_map->{$c}{COLUMN_SIZE} if $col_map->{$c};
608             }
609             }
610              
611 0           return %chg_cols;
612             }
613              
614             sub key_columns {
615 0     0     my ($self, $table) = @_;
616              
617 0           my $pk = $self->primary_key($table);
618 0 0         return $pk if $pk;
619              
620 0           my $idx = $self->index_info($table);
621 0 0         return unless $idx;
622              
623             # Look for unique indexes with suffixes uk, pk, or key
624 0           my ($pk_name) = sort grep /(?i)(?:[pu]k|key)\d*$/, keys %$idx;
625 0 0         return $idx->{$pk_name} if $pk_name;
626              
627 0           my ($idx_name) = sort keys %$idx;
628 0           return $idx->{$idx_name};
629             }
630              
631             sub upd_columns {
632 0     0     my ($self, $table, $key_cols) = @_;
633              
634 0           my $col_data = $self->column_info($table)->{LIST};
635 0   0       $key_cols ||= $self->key_columns($table);
636 0 0         return unless $key_cols;
637              
638 0           my %is_key_col; $is_key_col{$_}++ for @$key_cols;
  0            
639 0           return [ grep !$is_key_col{$_}, @$col_data ];
640             }
641              
642             sub delete {
643 0     0     my ($self, $table, $where) = @_;
644              
645 0           my $dbh = $self->{DBH};
646 0           my $sql = "DELETE FROM $table";
647 0 0         $sql .= " WHERE $where" if $where;
648              
649 0           my $rows = $dbh->do($sql) + 0;
650              
651 0           print "$rows rows deleted\n";
652 0           return $rows;
653             }
654              
655             # Execute sql with retry on deadlocks
656             sub execute {
657 0     0     my ($self,$sth,@args) = @_;
658              
659             # We can pass a sql statement or a sth
660 0 0         $sth = $self->{DBH}->prepare($sth) if !ref($sth);
661              
662 0           my $retry = 5;
663 0           for (1..$retry) {
664 0           my $status = eval { $sth->execute(@args) };
  0            
665 0 0         return $status if $status;
666 0 0         confess $@ unless $@ =~ /deadlock/i;
667 0           print "Deadlock detected on retry $_ of 5\n";
668 0 0         sleep 2 if $_ < $retry;
669             }
670 0           confess $@;
671             }
672              
673             sub ora_date_fmt {
674              
675             # Not very OO-ish but allow calling the Oracle date mask routine
676             # From any generic utility object
677 0     0     my $self = shift;
678 0           DBIx::BulkUtil::Oracle->date_mask(@_);
679              
680             }
681              
682             sub strptime_fmt {
683 0     0     my ($class, $str, $fmt) = @_;
684 0   0       $fmt ||= DBIx::BulkUtil::Oracle->date_mask($str);
685 0 0         return undef unless $fmt;
686 0           for ($fmt) {
687 0           s/MONTH/%B/;
688 0           s/MON/%b/;
689 0           s/MM/%m/;
690 0           s/DD/%d/;
691 0           s/YYYY/%Y/;
692 0           s/YY/%y/;
693 0           s/RRRR/%Y/;
694 0           s/RR/%y/;
695 0           s/HH24/%H/;
696 0           s/HH(?:12)?/%I/;
697 0           s/MI/%M/;
698 0           s/SS/%S/;
699 0           s/AM/%p/;
700 0           s/DY/%a/;
701 0           s/DAY/%a/;
702 0           s/TZD/%Z/;
703 0           s/TZH.TZD/%z/;
704 0           s/"(.)"/$1/g;
705             }
706 0           return $fmt;
707             }
708              
709             sub blk_prepare {
710 0     0     my ($self, $table, %args) = @_;
711 0   0       my $blk_opts = $args{BlkOpts} || {};
712 0   0       my $commit = $args{CommitSize} || 1000;
713 0           my $con = $args{Constants};
714              
715 0 0         my $col_info = $self->column_info($table) or confess "Table $table does not exist";
716 0           my @col_list = @{$col_info->{LIST}};
  0            
717 0           my $arg_len = @col_list;
718              
719 0           my $col_cnt = @col_list;
720 0           my $sql = "INSERT INTO $table VALUES (" . join(",", ("?") x $col_cnt) . ")";
721 0           my $type = $self->type();
722 0 0         my @blk_opts = ($type eq 'Sybase')
723             ? { syb_bcp_attribs => $blk_opts }
724             : ();
725              
726 0           my $dbh = $self->{DBH};
727 0           my $sth = $dbh->prepare($sql, @blk_opts);
728              
729 0           my ($exec_f,$commit_f,$finish_f);
730 0           my @ex_arg_list = (undef) x @col_list;
731 0           my $cnt = 0;
732 0 0         if ($con) {
733 0           my %const = %$con;
734 0           my @c_list = keys %const;
735              
736 0           my %col_pos;
737 0           @col_pos{@col_list} = 0..$#col_list;
738 0           my %const_pos;
739 0           @const_pos{@c_list} = delete @col_pos{@c_list};
740 0           $arg_len = keys %col_pos;
741              
742             # Create arg array for execute method
743             # Set constants and create sub for all but constant args
744 0           @ex_arg_list[@const_pos{@c_list}] = @const{@c_list};
745 0           my @non_const = sort { $a <=> $b } values %col_pos;
  0            
746 0 0         $sth->{HandleError} = undef if $type eq 'Oracle';
747              
748 0 0         if ($type eq 'Sybase') {
749 0     0     $exec_f = sub { @ex_arg_list[@non_const] = @_; $sth->execute(@ex_arg_list) };
  0            
  0            
750 0     0     $commit_f = sub { $dbh->commit() };
  0            
751 0     0     $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef };
  0            
  0            
  0            
752             } else {
753 0     0     $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$i++] for @non_const };
  0            
  0            
  0            
754             $commit_f = sub {
755 0     0     my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list);
756 0 0         unless (defined $t) {
757 0           for my $i (0..$#status) {
758 0 0         next unless ref $status[$i];
759 0 0         my @row = map { ref($ex_arg_list[$_]) ? qq('$ex_arg_list[$_][$i]') : $ex_arg_list[$_] } 0..$#ex_arg_list;
  0            
760 0           confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]";
761             }
762             }
763 0           $_ = [] for @ex_arg_list[@non_const];
764 0           $r;
765 0           };
766 0 0   0     $finish_f = sub { $commit_f->() if $cnt > 0 };
  0            
767             }
768             } else {
769 0 0         if ($type eq 'Sybase') {
770 0     0     $exec_f = sub { $sth->execute(@_); '0E0' };
  0            
  0            
771 0     0     $commit_f = sub { $dbh->commit(); $cnt };
  0            
  0            
772 0 0   0     $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef; ( $cnt > 0 ) ? $cnt : '0E0' };
  0            
  0            
  0            
  0            
773             } else {
774 0     0     $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$_] for 0..$#ex_arg_list; return '0E0' };
  0            
  0            
  0            
  0            
775             $commit_f = sub {
776 0     0     my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list);
777 0 0         unless (defined $t) {
778 0           for my $i (0..$#status) {
779 0 0         next unless ref $status[$i];
780 0           my @row = map { qq('$ex_arg_list[$_][$i]') } 0..$#ex_arg_list;
  0            
781 0           confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]";
782             }
783             }
784 0           $_ = [] for @ex_arg_list;
785 0           $r;
786 0           };
787 0 0   0     $finish_f = sub { ( $cnt > 0 ) ? $commit_f->() : '0E0' };
  0            
788             }
789             }
790              
791 0           bless {
792             CNT => \$cnt,
793             COMMIT_SIZE => $commit,
794             EXEC_FUNC => $exec_f,
795             COMMIT_FUNC => $commit_f,
796             FINISH_FUNC => $finish_f,
797             ARG_LEN => $arg_len,
798             }, "DBIx::BulkUtil::BLK";
799             }
800              
801             sub prepare {
802 0     0     my $self = shift;
803 0           my %opt = @_;
804 0           my $table = $opt{Table};
805 0           my $sql = $opt{Sql};
806 0           my $columns = $opt{Columns};
807 0           my $href = $opt{BindHash};
808 0           my $aref = $opt{BindArray};
809             my $by_name =
810             defined($opt{ByName}) ? $opt{ByName}
811 0 0 0       : ( !$href && !$aref ) ? 0
    0          
    0          
812             : ($self->type() eq 'Sybase' ) ? 0
813             : 1;
814 0 0 0       confess "Can not supply both BindHash and BindArray" if $href && $aref;
815 0 0 0       confess "Can not use BindHash or BindArray without ByName" if ( $href || $aref ) && !$by_name;
      0        
816              
817 0 0 0       confess "Must supply Table or Sql to prepare" unless $table || $sql;
818 0 0 0       confess "Can not supply both Table and Sql to prepare" if $table && $sql;
819              
820 0           my $dflt_col = eval {
821 0 0 0       $columns ||= $self->column_info($table)->{LIST} if $table;
822 0           1;
823             };
824 0 0         confess "Table $table not found in datbase" unless $dflt_col;
825              
826 0 0 0       if ( $columns && @$columns ) {
827              
828             # A little overkill to get a nicely formatted SQL statement
829 0 0         my $c_sep = ( @$columns > 5 ) ? "\n" : '';
830 0           my $cnt;
831 0 0   0     my $c_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $cnt++ ? ' ' : '' };
  0 0          
  0            
832              
833 0           my $h_cnt;
834 0 0   0     my $h_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $h_cnt++ ? ' ' : '' };
  0 0          
  0            
835              
836 0 0         my $v_sep = $by_name ? ( @$columns > 5 ) ? "\n" : '' : '';
    0          
837 0 0   0     my $hold = $by_name ? sub { $h_ind->() . ":$_" } : sub { "?" };
  0            
  0            
838              
839 0   0       $sql ||= sprintf("INSERT INTO $table ($c_sep%s$c_sep) VALUES ($v_sep%s$v_sep)",
840             join(",$c_sep", map $c_ind->() . $_, @$columns),
841             join(",$v_sep", map $hold->(), @$columns),
842             );
843             }
844 0           print "Preparing: $sql\n";
845 0           my $sth = $self->dbh->prepare($sql);
846              
847 0 0         if ($href) {
    0          
848 0           $sth->bind_param_inout( ":$_" => \$href->{$_}, 0 ) for @$columns;
849             } elsif ($aref) {
850 0           $sth->bind_param_inout( ":$columns->[$_]" => \$aref->[$_], 0 ) for 0..$#$columns;
851             }
852              
853 0           return $sth;
854              
855             }
856              
857             sub prepare_upd {
858              
859 0     0     my $self = shift;
860              
861 0           my %args = @_;
862              
863 0   0       my $table = $args{Table} || die "Must supply Table option";
864              
865 0           my $col_info = $self->column_info($table);
866 0   0       my $col_list = $args{Columns} || $col_info->{LIST};
867              
868 0   0       my $key_cols = $args{KeyCols} || $self->key_columns($table);
869 0   0       my $upd_cols = $args{UpdCols} || $self->upd_columns($table);
870              
871 0           my $sql = <
872             UPDATE $table
873             SET
874 0           @{[ join( ",\n ", map "$_ = ?", @$upd_cols )]}
875             WHERE
876 0           @{[ join( " AND\n ", map "$_ = ?", @$key_cols )]}
877             SQL
878 0           print "Preparing: $sql\n";
879 0           my $sth = $self->{DBH}->prepare($sql);
880              
881 0           my %col_pos;
882 0           my $cnt = 0;
883 0           $col_pos{$_} = $cnt++ for @$col_list;
884              
885 0           my @sth_pos;
886 0           push @sth_pos, $col_pos{$_} for @$upd_cols, @$key_cols;
887              
888             return sub {
889 0 0   0     unless (@_) {
890 0           $sth->finish();
891 0           undef $sth;
892 0           undef @sth_pos;
893 0           return;
894             }
895 0           $sth->execute(@_[@sth_pos]);
896             }
897 0           }
898              
899 0     0     sub is_iq { 0 }
900              
901             package DBIx::BulkUtil::BLK;
902              
903 1     1   5 use Carp qw(confess);
  1         1  
  1         148  
904              
905             sub execute {
906 0     0     my $self = shift;
907 0 0         unless (@_ == $self->{ARG_LEN}) {
908 0           my $arg_cnt = @_;
909 0           confess "Execute argument count $arg_cnt must be $self->{ARG_LEN}";
910             }
911 0           my $f = $self->{ARG_FUNC};
912 0           my $rows = $self->{EXEC_FUNC}->(@_);
913 0           my $cnt = $self->{CNT};
914 0 0         if ( ++$$cnt >= $self->{COMMIT_SIZE} ) {
915 0           $rows = $self->{COMMIT_FUNC}->();
916 0           $$cnt = 0;
917             }
918 0           return $rows;
919             }
920              
921             sub finish {
922 0     0     my $self = shift;
923 0           $self->{FINISH_FUNC}->();
924             }
925              
926             package DBIx::BulkUtil::Sybase;
927              
928 1     1   3 use Carp qw(confess);
  1         1  
  1         1360  
929              
930             our @ISA = qw(DBIx::BulkUtil::Obj);
931              
932 0     0     sub now { 'getdate()' };
933              
934             sub add {
935 0     0     my $self = shift;
936 0           my $date = shift;
937 0           my $n = shift;
938 0           my $unit = shift;
939 0           my $new_date = "dateadd( $unit, $n, $date)";
940 0 0         return $new_date unless @_;
941 0           return $self->add( $new_date, @_ );
942             }
943              
944             sub diff {
945 0     0     my $self = shift;
946 0           my $date1 = shift;
947 0           my $date2 = shift;
948 0           my $unit = shift;
949 0           my $new_date = "datediff( $unit, $date1, $date2)";
950 0           return $new_date;
951             }
952              
953             sub row_select {
954 0     0     my $self = shift;
955 0           my $sel = shift;
956 0           return "select $sel";
957             }
958              
959             sub sp_sth {
960 0     0     my $self = shift;
961 0           my $sth = $self->{DBH}->prepare($self->sp_sql(@_));
962 0           $sth->execute();
963 0           return $sth;
964             }
965              
966             sub sp_sql {
967 0     0     my $self = shift;
968 0           my ($stored_proc, @args) = @_;
969 0           return "exec " . join(" ", $stored_proc, join(",", map {$self->{DBH}->quote($_)} grep !/^:cursor$/, @args));
  0            
970             }
971              
972             # This is trivial in Sybase, but a necessary function for Oracle
973             # and so makes this portably compatible
974             sub to_datetime {
975 0     0     my $self = shift;
976 0           my $date = shift;
977              
978 0           return "'$date'";
979             }
980              
981             sub bcp_in {
982 0     0     my $self = shift;
983 0 0         my $optref = (ref $_[-1]) ? pop @_ : {};
984 0           my %opts = %$optref;
985              
986 0           my ( $table, $file, $dir ) = @_;
987 0 0         my $partition = ( $table =~ s/(:\d+)$// ) ? $1 : '';
988              
989 0   0       $file ||= "$table.bcp";
990 0   0       $dir ||= 'in';
991              
992 0           my $dbh = $self->{DBH};
993 0           my $db = $dbh->{Name};
994 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
995 0           my $server = $1;
996 0           my $database = $self->curr_db();
997              
998 0           my $user = $dbh->{Username};
999 0   0       my $delimiter = $opts{Delimiter} || $self->{DELIMITER};
1000 0   0       my $row_delimiter = $opts{RowDelimiter} || "\n";
1001 0   0       my $commit_size = $opts{CommitSize} || 1000;
1002              
1003 0 0 0       my $bcp_table =
    0          
    0          
1004             (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1005             : ($table =~ /^\w+$/) ? "$database..$table"
1006             : ($table =~ /^\w*\.\w+$/) ? "$database.$table"
1007             : confess "Can not determine database for bcp";
1008              
1009 0           $bcp_table .= $partition;
1010              
1011             # Simulate Oracle sqlldr Append/Replace/Truncate
1012 0           my $id_cnt;
1013 0 0         if ( $dir eq 'in' ) {
1014 0   0       my $mode = $opts{Action} || "A";
1015 0 0         if ( $mode eq 'T' ) {
    0          
1016 0           my $sql = "TRUNCATE TABLE $bcp_table";
1017 0           print "Executing: $sql\n";
1018 0           $dbh->do($sql);
1019             } elsif ($mode eq 'R') {
1020 0           $self->delete($bcp_table, '', $commit_size);
1021             }
1022 0 0         confess "BCP file $file does not exist" unless -f $file;
1023              
1024             # Save some work
1025             # checking underscore ok, we just did -f above
1026 0 0         unless ( -s _ ) {
1027 0           print "$file is empty. Skipping bcp\n";
1028              
1029             # Make any log file parsers happy
1030 0           print "0 rows copied\n";
1031 0           return 0;
1032             }
1033              
1034             # All this to decide whether or not to use '-E'
1035             # Only use '-E' if there is an identity column
1036             # And GenerateId is false
1037 0 0         unless ( $opts{GenerateId} ) {
1038 0           my $col_info = $self->column_info($table);
1039 0           my $col_map = $col_info->{MAP};
1040 0 0         if ($col_map) {
1041 0           for my $c ( values %$col_map ) {
1042 0 0 0       ++$id_cnt and last if $c->{TYPE_NAME} =~ /identity/;
1043             }
1044             }
1045             }
1046             }
1047              
1048 0 0         my ($action,$to_from) = ($dir eq 'in') ? ('Loading', 'from') : ('Exporting', 'to');
1049 0           print "$action $server/$bcp_table $to_from $file\n";
1050              
1051 0           my (@max_err_opt, @commit_opt, @header_opt, @id_opt);
1052 0   0       my $max_err_cnt = $opts{MaxErrors} || 0;
1053 0 0         if ( $dir eq 'in' ) {
1054 0           @max_err_opt = (-m => $max_err_cnt);
1055 0           @commit_opt = (-b => $commit_size);
1056 0 0         @header_opt = (-F => $opts{Header}+1) if $opts{Header};
1057 0 0         @id_opt = "-E" if $id_cnt;
1058             }
1059              
1060 0   0       my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
1061 0   0       my $in_temp_dir = $opts{TempDir} || $opts{Debug};
1062 0           my $temp_dir;
1063 0 0 0       $temp_dir = $opts{TempDir} || "." if $in_temp_dir;
1064              
1065 0           require File::Temp;
1066 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
1067 0 0         my @unlink = $keep_temp ? (UNLINK => 0) : ();
1068 0           my $error_file = File::Temp->new(
1069             TEMPLATE => "${table}_XXXXX",
1070             SUFFIX => ".err",
1071             @temp_dir, @unlink,
1072             );
1073 0           chmod(0664, $error_file->filename());
1074 0           $error_file->close();
1075              
1076 0 0         my @packet_size = $opts{PacketSize} ? ( -A => $opts{PacketSize} ) : ();
1077 0 0         my @passthru = $opts{PassThru} ? @{$opts{PassThru}} : ();
  0            
1078              
1079 0           my ( $fmt_file, $tmp_fmt_file );
1080 0 0 0       if ( $opts{FormatFile} ) {
    0 0        
      0        
1081 0           $fmt_file = $opts{FormatFile};
1082 0           } elsif ( ( $opts{ColumnList} && $opts{ColumnList} ) || ( $opts{Filler} && @{$opts{Filler}} ) ) {
1083             ($tmp_fmt_file,$fmt_file) = $self->mk_fmt_file(
1084             Table => $table,
1085             Delimiter => $delimiter,
1086             RowDelimiter => $row_delimiter,
1087             ColumnList => $opts{ColumnList},
1088             Filler => $opts{Filler},
1089             TempDir => $opts{TempDir},
1090             FormatFileName => $opts{FormatFileName},
1091 0           KeepTempFiles => $keep_temp,
1092             );
1093             }
1094 0 0         my @fmt_file_opt = $fmt_file ? ( -f => $fmt_file ) : '-c';
1095              
1096             # UTF-8 doesn't work on HP - default is roman8 on HP
1097             # Should probably make '-J' some kind of option, with maybe
1098             # a map of OS types and default values. But leave that for
1099             # a later date.
1100 0           my @cmd = ( bcp => $bcp_table, $dir, $file,
1101             -U => $user,
1102             #-J => "utf8",
1103             -S => $server,
1104             -t => $delimiter,
1105             -r => $row_delimiter,
1106             -e => $error_file->filename(),
1107             @header_opt,
1108             @id_opt,
1109             @commit_opt,
1110             @max_err_opt,
1111             @packet_size,
1112             @passthru,
1113             @fmt_file_opt,
1114             );
1115 0           print "Executing: @cmd\n";
1116 0           push @cmd, -P => $self->{PASSWORD};
1117 0 0         open(my $fh, "-|", @cmd) or confess "Can't exec bcp: $!";
1118              
1119 0           my ($rows, $failed, $partially_failed);
1120 0           local ($_, $.);
1121              
1122 0           my $err_cnt = my $c_lib_err_cnt = my $srvr_err_cnt = 0;
1123 0           while (<$fh>) {
1124 0           print;
1125 0 0         if ( /^(Server|C[TS]LIB) Message/ ) {
1126 0           my $msg_type = $1;
1127 0 0         if ( $msg_type eq 'CSLIB' ) {
    0          
1128 0 0         if ( m|/N(\d+)| ) {
1129             # Sybase says truncation is not an error, so we will too
1130             # Or else we might get > 1 error on the same row
1131 0 0         unless ( $1 == 36 ) {
1132 0           $err_cnt++;
1133 0           $c_lib_err_cnt++;
1134             }
1135             }
1136             } elsif ( $msg_type eq 'CTLIB' ) {
1137 0           $err_cnt++;
1138 0           $c_lib_err_cnt++;
1139             } else {
1140             # On server errors the whole batch is an error
1141 0 0         if ( /\s(\d+)/ ) {
1142             # Ignore 'slow bcp' warning
1143 0 0         unless ( $1 == 4852 ) {
1144 0           $err_cnt += $commit_size;
1145 0           $srvr_err_cnt += $commit_size;
1146             }
1147             } else {
1148 0           $err_cnt += $commit_size;
1149 0           $srvr_err_cnt += $commit_size;
1150             }
1151             }
1152             }
1153 0 0         $rows = $1 if /^(\d+) rows copied/;
1154              
1155             # failed or partially failed
1156 0 0         if ( /^bcp copy in ((?:partially )?)failed/ ) {
1157 0 0         $partially_failed++ if $1;
1158 0           $failed++;
1159             }
1160             }
1161              
1162             # "NaN" (literally "NaN") to numeric errors
1163             # do not show up on STDOUT.
1164             # So we may as well search the err file to count
1165             # all CSLIB and CTLIB errors.
1166             # Truncation errors do not show up in file, so we
1167             # don't have to filter them out as we would if we
1168             # were parsing STDOUT.
1169 0           my $err_file_cnt = 0;
1170 0 0         open(my $err_h, "<", $error_file->filename()) or die "Failed to open $error_file: $!";
1171 0           while (<$err_h>) {
1172 0 0         $err_file_cnt++ if /^#@ Row \d+: Not transferred/;
1173             }
1174 0           close $err_h;
1175              
1176 0 0         if ( $err_file_cnt > $c_lib_err_cnt ) {
1177 0           $err_cnt += $err_file_cnt - $c_lib_err_cnt;
1178             }
1179              
1180             # BCP 11.x,12.x returns meaningful exit status
1181             # 10.x does not (returns 0 even on errors)
1182 0           my $close_success = close $fh;
1183              
1184 0 0         unless ($close_success) {
1185 0           my $exit_stat = $? >> 8;
1186 0           my $exit_sig = $? & 127;
1187 0           my $exit_core = $? & 128;
1188              
1189             # bcp will exit with non-zero status on any 'Server' error,
1190             # but not on 'CSLIB' errors unless 'CSLIB' error count exceeds max.
1191 0 0         if ( $exit_stat != 0 ) {
1192 0 0         if ( $dir eq 'in' ) {
1193              
1194             # Some of this may seem unneccessary, but Sybase bcp is
1195             # horribly inconsistent.
1196              
1197             # Exceeded the error count
1198 0 0         confess "BCP error - max error count ($max_err_cnt) exceeded - bcp returned status $exit_stat: $!"
1199             if $err_cnt > $max_err_cnt;
1200              
1201             # The load was aborted before bcp indicated that it finished
1202 0 0 0       confess "BCP error - bcp aborted [$exit_stat]: $!"
1203             if !defined($rows) and !$failed;
1204              
1205             # BCP failed - even if we allow some errors on a small file, if zero rows are copied
1206             # then call it a total failure.
1207 0 0 0       confess "BCP error - bcp failed [$exit_stat]: $!"
1208             if $failed and !$partially_failed;
1209              
1210             } else {
1211 0           confess "BCP error - bcp returned status $exit_stat: $!";
1212             }
1213             }
1214              
1215 0 0         confess "BCP error - bcp recieved signal $exit_sig" if $exit_sig > 0;
1216 0 0         confess "BCP error - bcp coredumped" if $exit_core;
1217             }
1218              
1219             # Will miss error count exceeded error on 10.x
1220             # But will catch other errors if load is aborted
1221             # Or no rows are loaded.
1222 0 0         confess "BCP error - no rows copied" if !defined($rows);
1223              
1224             # CTLIB errors do not cause non-zero exit - so catch them here
1225 0 0         confess "BCP error - max error count ($max_err_cnt) exceeded" if $err_cnt > $max_err_cnt;
1226 0   0       $rows ||= 0;
1227 0           return $rows;
1228             }
1229              
1230             {
1231 1     1   4 no warnings 'once';
  1         2  
  1         3597  
1232             *bcp = \&bcp_in;
1233             }
1234              
1235             sub mk_fmt_file {
1236 0     0     my $self = shift;
1237 0           my %opts = @_;
1238              
1239 0   0       my $table = $opts{Table} || die "Table required for mk_fmt_file";
1240 0           my $col_info = $self->column_info($table);
1241 0           my $db_col_list = $col_info->{LIST};
1242 0           my %is_db_column;
1243 0           $is_db_column{$_}++ for @$db_col_list;
1244 0           my %is_filler;
1245 0 0         if ( $opts{Filler} ) {
1246 0           $is_filler{lc($_)}++ for @{$opts{Filler}};
  0            
1247             }
1248              
1249 0           my ($tmp_fmt_file,$fmt_file);
1250 0 0         if ( $opts{FormatFileName} ) {
1251 0           $fmt_file = $opts{FormatFileName};
1252             } else {
1253 0           require File::Temp;
1254 0   0       my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
1255 0   0       my $in_temp_dir = $opts{TempDir} || $opts{Debug};
1256 0           my $temp_dir;
1257              
1258 0 0 0       $temp_dir = $opts{TempDir} || "." if $in_temp_dir;
1259 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
1260 0 0 0       my @unlink = ( $keep_temp || !defined(wantarray) ) ? (UNLINK => 0) : ();
1261 0           $tmp_fmt_file = File::Temp->new(
1262             TEMPLATE => "${table}_XXXXX",
1263             SUFFIX => ".fmt",
1264             @temp_dir, @unlink,
1265             );
1266 0           $fmt_file = $tmp_fmt_file->filename();
1267 0           chmod(0664, $tmp_fmt_file);
1268 0           $tmp_fmt_file->close();
1269             }
1270              
1271 0   0       my $delim = $opts{Delimiter} || "|";
1272 0   0       my $row_delim = $opts{RowDelimiter} || "\n";
1273              
1274             # Need escaped text in fmt file
1275             # for CR/LF
1276 0           for ($delim,$row_delim) {
1277 0           s/\n/\\n/g;
1278 0           s/\r/\\r/g;
1279             }
1280              
1281             my @col_list = ( $opts{ColumnList} && @{$opts{ColumnList}} )
1282 0           ? @{$opts{ColumnList}}
1283 0 0 0       : @{$col_info->{LIST}};
  0            
1284              
1285 0           my $ncols = @col_list;
1286 0 0         open( my $fh, ">", $fmt_file ) or confess "Failed to open $fmt_file: $!";
1287 0           print $fh "10.0\n";
1288 0           print $fh "$ncols\n";
1289              
1290 0           my $col_map = $col_info->{MAP};
1291 0           for my $i (1..$ncols) {
1292 0           my $name = $col_list[$i-1];
1293 0 0         my $d = ( $i == $ncols ) ? $row_delim : $delim;
1294 0           my @row = ($i, 'SYBCHAR', 0);
1295 0 0         if ($is_filler{lc($name)}) {
    0          
1296 0           push @row, 0, qq["$d"], 0;
1297             } elsif ($is_db_column{lc($name)}) {
1298 0           my $info = $col_map->{lc($name)};
1299              
1300             # Native Sybase date format size is 26 though metadata says 23
1301             # For numbers, add extra for decimal
1302             my $size =
1303             ( $info->{TYPE_NAME} =~ /date/ ) ? 26
1304             : ( $info->{TYPE_NAME} =~ /char|text/ ) ? $info->{COLUMN_SIZE}
1305 0 0         : $info->{COLUMN_SIZE} + 1;
    0          
1306 0           push @row, $size, qq["$d"], $info->{ORDINAL_POSITION}, $name;
1307 0           } else { confess "$name is neither a db nor filler column" }
1308 0           print $fh join("\t", @row), "\n";
1309             }
1310              
1311 0           close $fh;
1312              
1313              
1314             # Also return temp object so it will not be cleaned up yet
1315             return
1316 0 0         wantarray ? ($tmp_fmt_file, $fmt_file)
    0          
1317             : $tmp_fmt_file ? $tmp_fmt_file
1318             : $fmt_file;
1319              
1320             }
1321              
1322             sub bcp_out {
1323 0     0     my $self = shift;
1324 0           my @opts;
1325 0 0         if (ref $_[-1]) {
1326 0           @opts = pop @_;
1327             }
1328 0           my ($table, $file) = @_;
1329 0   0       $file ||= "$table.bcp";
1330              
1331 0 0 0       my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb';
1332              
1333             # Sybase rounds money columns, need to bcp a view of it
1334             # if any exist.
1335 0           my $dbh = $self->{DBH};
1336              
1337             # Need to save current db in case view is created
1338 0           my $curr_db = $self->curr_db();
1339 0           my $view = $self->mk_view($table, @opts);
1340              
1341 0   0       my $rows = eval { $self->bcp($view || $table, $file, 'out', @opts) };
  0            
1342 0 0         unless (defined $rows) {
1343 0           my $err = $@;
1344 0 0         if ($view) {
1345 0           warn "BCP error detected - dropping view $view\n";
1346 0           my $result = eval { $dbh->do("DROP VIEW $view") };
  0            
1347 0 0         warn "Unable to drop view $view: $@" unless $result;
1348 0 0 0       $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db;
1349             }
1350 0           confess $err;
1351             }
1352              
1353 0 0         if ($view) {
1354 0           print "Dropping view $view\n";
1355 0           $dbh->do("DROP VIEW $view");
1356 0 0 0       $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db;
1357             }
1358 0 0 0       if ( !@opts or !$opts[0]{NoFix} ) {
1359 0           my $bak = eval { $self->fix_bcp_file($file, @opts) };
  0            
1360 0 0         if ( $bak ) {
1361 0           unlink $bak;
1362             } else {
1363 0           warn "Error processing $file. BCP file in $file.bak: $@\n";
1364 0           return;
1365             }
1366             }
1367 0 0 0       if ( @opts and ( $opts[0]{Header} || $opts[0]{QuoteFields} ) ) {
      0        
1368 0           my $bak = eval { $self->add_header($table, $file, @opts) };
  0            
1369 0 0         if ( $bak ) {
1370 0           unlink $bak;
1371             } else {
1372 0           warn "Error post processing $file. BCP file in $file.bak: $@\n";
1373 0           return;
1374             }
1375             }
1376 0           return $rows;
1377             }
1378              
1379             sub mk_view {
1380              
1381 0     0     my ($self,$table) = @_;
1382 0           my @opts;
1383 0 0         @opts = pop @_ if ref $_[-1];
1384 0 0 0       my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb';
1385              
1386             # Sybase rounds money columns, need to bcp a view of it
1387             # if any exist.
1388 0           my $dbh = $self->{DBH};
1389              
1390 0           my $col_info = $self->column_info($table);
1391              
1392             # Columns might be a string from a SELECT clause
1393             # Or it might be an arrayref of columns
1394             my $col_list = ( @opts && $opts[0]{Columns} )
1395             ? $opts[0]{Columns}
1396 0 0 0       : $col_info->{LIST};
1397              
1398 0           my $col_map = $col_info->{MAP};
1399              
1400 0           my @columns;
1401 0           my $money_cnt = 0;
1402              
1403 0           my $column_str;
1404 0 0         if ( ref $col_list ) {
1405 0           for my $name (@$col_list) {
1406 0           my $col_name = $name;
1407 0 0         if ( my $info = $col_map->{$name} ) {
1408 0           my $type = $info->{TYPE_NAME};
1409 0           $col_name = $info->{COLUMN_NAME};
1410 0 0         if ($type =~ /money/) {
1411 0           $money_cnt++;
1412 0 0         my $len = ($type =~ /small/) ? 10 : 19;
1413 0           $col_name = "convert(decimal($len,4), $col_name) $col_name";
1414             }
1415             }
1416 0           push @columns, $col_name;
1417             }
1418 0           $column_str = join ",", @columns;
1419             } else {
1420 0           $column_str = $col_list;
1421             }
1422              
1423 0 0 0       return if $money_cnt==0 and !$opts[0]{Filter} and !$opts[0]{Columns};
      0        
1424              
1425 0           my ($view, $db_view);
1426              
1427 0           my $curr_db = $self->curr_db();
1428 0 0 0       if ( !$curr_db and $table =~ /^(\w+)\.\w*\.\w+$/ ) {
1429 0           $curr_db = $1;
1430             }
1431 0 0         confess "Can not determine database" unless $curr_db;
1432              
1433 0 0 0       my $base_table =
    0          
    0          
1434             (!$curr_db or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1435             : ($table =~ /^\w+$/) ? "$curr_db..$table"
1436             : ($table =~ /^\w*\.\w+$/) ? "$curr_db.$table"
1437             : confess "Can not determine database for view";
1438              
1439 0           ( my $tmp_view = $base_table ) =~ s/.*\.//;
1440 0 0         $tmp_view = substr($tmp_view, 0, 19) if length($tmp_view) > 19;
1441              
1442 0 0         $dbh->do("USE $scratchdb") unless $self->is_iq();
1443              
1444 0           my $cnt;
1445 0           while (1) {
1446 0           my ($sec, $min, $hr) = localtime;
1447 0           my $id = sprintf("%05d%02d%02d%02d", $$, $hr, $min, $sec);
1448              
1449 0           $view = "${tmp_view}${id}";
1450 0 0         $db_view = $self->is_iq() ? $view : "$scratchdb..$view";
1451 0           my $sql = sprintf(
1452             "CREATE VIEW %s AS SELECT %s FROM %s",
1453             $view,
1454             $column_str,
1455             $base_table,
1456             );
1457 0 0 0       $sql .= " $opts[0]{Filter}" if @opts && $opts[0]{Filter};
1458 0           print "Creating view $db_view\n";
1459 0           print "Executing: $sql\n";
1460 0           my $result = eval { $dbh->do($sql) };
  0            
1461 0 0         return $view if $result;
1462 0 0         confess $@ unless $@ =~ /already an object/;
1463 0           $cnt++;
1464 0 0         confess "Too many retries trying to create view $db_view. Aborting"
1465             if $cnt > 20;
1466 0           print "View $db_view already exists, retrying #$cnt...";
1467 0           sleep 2;
1468             }
1469              
1470             }
1471              
1472             # Fix native date format from Sybase bcp out
1473             { my %mons = qw( Jan 1 Feb 2 Mar 3 Apr 4 May 5 Jun 6 Jul 7 Aug 8 Sep 9 Oct 10 Nov 11 Dec 12 );
1474             my $mon_str = join '|', keys %mons;
1475             my $mon_re = qr/$mon_str/;
1476              
1477             sub fix_bcp_file {
1478 0     0     my ( $self, $file ) = @_;
1479 0           my $opts = {};
1480 0 0         if (ref $_[-1]) {
1481 0           $opts = pop @_;
1482             }
1483 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER} || '|';
1484 0           my $dre = quotemeta($delimiter);
1485 0           local ($_, $., $ARGV, *ARGV);
1486 0           local ( $^I, @ARGV ) = ( '.bak', $file );
1487 0   0       local $/ = $opts->{RowDelimiter} || $/;
1488 0           while ( <> ) {
1489 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d):(\d\d):(\d{3})([AP])M($dre|$/)!
1490             $1 .
1491             sprintf( '%04d-%02d-%02d %02d:%02d:%02d.%03d',
1492             $4,
1493 0 0 0       $mons{ $2 },
    0 0        
1494             $3,
1495             ( $9 eq 'P' && $5 < 12) ? $5 + 12 : ( $9 eq 'A' && $5 == 12 ) ? 0 : $5,
1496             $6,
1497             $7,
1498             $8 ) .
1499             $10
1500             !eg;
1501 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d)([AP])M($dre|$/)!
1502             $1 .
1503             sprintf( '%04d-%02d-%02d %02d:%02d',
1504             $4,
1505 0 0 0       $mons{ $2 },
    0 0        
1506             $3,
1507             ( $7 eq 'P' && $5 < 12) ? $5 + 12 : ( $7 eq 'A' && $5 == 12 ) ? 0 : $5,
1508             $6 ) .
1509             $8
1510             !eg;
1511 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})($dre|$/)!
1512             $1 .
1513             sprintf( '%04d-%02d-%02d',
1514             $4,
1515 0           $mons{ $2 },
1516             $3 ) .
1517             $5
1518             !eg;
1519 0           print;
1520             }
1521 0           return "$file.bak";
1522             }
1523             }
1524              
1525             {
1526             my %type_map = ( 'V' => 'V', 'P' => 'P', 'U' => 'T' );
1527              
1528             sub obj_type {
1529 0     0     my ( $self, $name ) = @_;
1530 0           my $dbh = $self->{DBH};
1531 0           my $qname = $dbh->quote($name);
1532 0           my ( $type ) = $dbh->selectrow_array("select type from sysobjects where name = $qname");
1533 0 0         return unless $type;
1534 0   0       return $type_map{$type} || confess "Don't know about type $type for object $name";
1535             }
1536             }
1537              
1538             sub curr_db {
1539 0     0     my $self = shift;
1540              
1541 0           $self->get('db_name()');
1542             }
1543              
1544 0     0     sub curr_schema { undef }
1545              
1546             {
1547              
1548             # Can get errors in some databases if you don't add dbo to everything
1549             my $sql_t = <
1550             SELECT
1551             dbo.sysindexes.name,
1552             index_col(object_name(dbo.sysindexes.id), dbo.sysindexes.indid, dbo.syscolumns.colid) col_name
1553             FROM dbo.sysindexes, dbo.syscolumns
1554             WHERE dbo.sysindexes.id = dbo.syscolumns.id
1555             AND dbo.syscolumns.colid <= dbo.sysindexes.keycnt
1556             AND dbo.sysindexes.id = object_id(%s)
1557             SQL
1558              
1559             sub index_info {
1560 0     0     my ( $self, $table, $all_indexes ) = @_;
1561              
1562 0           my ($tmp_db, $curr_db) = (undef,'');
1563 0           my $dbh = $self->{DBH};
1564              
1565 0           my $schema = '';
1566 0           $tmp_db = $curr_db = $self->curr_db();
1567              
1568 0 0         if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) {
1569 0           ($tmp_db, $schema, $table) = ($1,$2,$3);
1570 0           $table = "$schema.$table";
1571              
1572             # We can only get info on the current database
1573 0 0 0       if ( defined($tmp_db) and $tmp_db ne $curr_db ) {
1574 0           $dbh->do("USE $tmp_db");
1575             }
1576             }
1577              
1578 0           my $sql = sprintf $sql_t, $dbh->quote($table);
1579 0 0         $sql .= "AND dbo.sysindexes.status & 2 = 2\n" unless $all_indexes;
1580 0           $sql .= "ORDER BY dbo.syscolumns.colid\n";
1581 0           my $sth = $dbh->prepare($sql);
1582 0           $sth->execute();
1583 0           my @col_names = @{$sth->{NAME_lc}};
  0            
1584 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
1585 0           my %ind;
1586 0           while ($sth->fetch()) {
1587 0 0         if ( $row{col_name} ) {
1588 0           push @{$ind{$row{name}}}, lc($row{col_name});
  0            
1589             }
1590             }
1591              
1592 0 0 0       $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1593              
1594 0 0         return unless %ind;
1595 0           return \%ind;
1596             }
1597              
1598             }
1599              
1600             sub primary_key {
1601 0     0     my ( $self, $table ) = @_;
1602 0           my $schema;
1603 0           my ($tmp_db, $curr_db) = (undef,'');
1604 0           my $dbh = $self->{DBH};
1605              
1606 0           $tmp_db = $curr_db = $self->curr_db();
1607              
1608 0 0         if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) {
1609 0           ($tmp_db, $schema, $table) = ($1,$2,$3);
1610 0   0       $schema ||= undef;
1611              
1612             # We can only get column info on the current database
1613 0 0 0       $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1614             }
1615              
1616 0           my @pk = $self->{DBH}->primary_key($tmp_db, $schema, $table);
1617              
1618 0 0 0       $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1619              
1620 0 0         return unless @pk;
1621              
1622 0           return \@pk;
1623             }
1624              
1625             {
1626              
1627             my $del_sql = <
1628             DELETE %s
1629             FROM %s d, %s s
1630             WHERE %s
1631             SQL
1632              
1633             my $ins_sql = <
1634             SELECT %s
1635             FROM %s
1636             SQL
1637              
1638             sub merge {
1639 0     0     my $self = shift;
1640 0           my %args = @_;
1641 0           my $dbh = $self->{DBH};
1642              
1643 0           my $table = lc($args{Table});
1644 0           my $stg_table = lc($args{StgTable});
1645              
1646 0           my $tbl_info = $self->column_info($table);
1647 0           my $tbl_map = $tbl_info->{MAP};
1648              
1649 0           my $stg_info = $self->column_info($stg_table);
1650 0           my $stg_map = $stg_info->{MAP};
1651              
1652 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
1653              
1654 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
1655 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table, $key_col_ref);
1656              
1657 0           my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref;
1658 0           my %is_key_col;
1659 0           $is_key_col{$_}++ for map lc, @$key_col_ref;
1660 0           my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref;
1661 0           my %is_upd_col;
1662 0           $is_upd_col{$_}++ for map lc, @$upd_col_ref;
1663              
1664 0           my %tmp_col_map;
1665 0 0         %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap};
  0            
1666              
1667             # Column map for upd statement, which must map the correct case
1668             # to the correct case.
1669 0           my %col_map = map {( $_ => (
1670             $tmp_col_map{lc($_)}
1671             ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME}
1672             : $stg_map->{lc($_)}{COLUMN_NAME}
1673 0 0         ))} @key_cols, @upd_cols;
1674              
1675             # Correctly cased field list for bcp select from stage table statement
1676             # Either it's in the explicit column map, or it's a key or upd column
1677             # with the same name as the target table,
1678             # or it can be last_chg_user or date
1679             my @fields = map {
1680             ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()'
1681             : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()'
1682             : $tmp_col_map{$_} ? $stg_has{$tmp_col_map{$_}} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME} : $tmp_col_map{$_}
1683             : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_has{$_} ? $stg_map->{$_}{COLUMN_NAME} : ()
1684             : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME}
1685 0 0 0       : confess "Failed to map target column $table.$_"
    0 0        
    0 0        
    0          
    0          
    0          
    0          
1686 0           } @{$tbl_info->{LIST}};
  0            
1687 0           my $field_str = join(",", @fields);
1688              
1689 0   0       my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols);
1690              
1691 0           my $del_merge_sql = sprintf($del_sql,
1692             $table,
1693             $table, $stg_table,
1694             $key_col_str,
1695             );
1696 0           print("Executing: $del_merge_sql\n");
1697              
1698 0 0         unless ($args{NoExec}) {
1699 0           my $del_rows = $dbh->do($del_merge_sql) + 0;
1700 0           print("$del_rows rows deleted from $table\n\n");
1701             }
1702              
1703 0           my $ins_merge_sql = sprintf($ins_sql,
1704             $field_str,
1705             $stg_table,
1706             );
1707 0           print("Inserting to $table: $ins_merge_sql\n");
1708              
1709 0 0         return 1 if $args{NoExec};
1710              
1711 0 0 0       my $ins_rows = ( $args{NoBCP} or ($stg_table =~ /^#/) )
1712             ? $dbh->do("INSERT INTO $table\n$ins_merge_sql") + 0
1713             : $self->bcp_sql($table, $ins_merge_sql) + 0;
1714 0           print("$ins_rows rows inserted to $table\n\n");
1715              
1716 0           return 1;
1717             }
1718             }
1719              
1720             # This merge is destructive to the staging table
1721             # Only 'new' rows will be left in the staging table
1722             {
1723              
1724             my $upd_sql = <
1725             UPDATE %s
1726             SET %s
1727             FROM %s d,%s s
1728             WHERE %s
1729             SQL
1730              
1731             my $del_sql = <
1732             DELETE %s
1733             FROM %s s, %s d
1734             WHERE %s
1735             SQL
1736              
1737             my $ins_sql = <
1738             SELECT %s
1739             FROM %s
1740             SQL
1741              
1742             sub merge2 {
1743 0     0     my $self = shift;
1744 0           my %args = @_;
1745 0           my $dbh = $self->{DBH};
1746              
1747 0           my $table = lc($args{Table});
1748 0           my $stg_table = lc($args{StgTable});
1749              
1750 0           my $tbl_info = $self->column_info($table);
1751 0           my $tbl_map = $tbl_info->{MAP};
1752              
1753 0           my $stg_info = $self->column_info($stg_table);
1754 0           my $stg_map = $stg_info->{MAP};
1755 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
1756              
1757 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
1758 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table);
1759              
1760 0           my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref;
1761 0           my %is_key_col;
1762 0           $is_key_col{$_}++ for map lc, @$key_col_ref;
1763 0           my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref;
1764 0           my %is_upd_col;
1765 0           $is_upd_col{$_}++ for map lc, @$upd_col_ref;
1766              
1767 0           my %tmp_col_map;
1768 0 0         %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap};
  0            
1769              
1770             # Column map for upd statement, which must map the correct case
1771             # to the correct case.
1772 0           my %col_map = map {( $_ => (
1773             $tmp_col_map{lc($_)}
1774             ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME}
1775             : $stg_map->{lc($_)}{COLUMN_NAME}
1776 0 0         ))} @key_cols, @upd_cols;
1777              
1778             # Correctly cased field list for bcp select from stage table statement
1779             # Either it's in the explicit column map, or it's a key or upd column
1780             # with the same name as the target table,
1781             # or it can be last_chg_user or date
1782             my @fields = map {
1783             ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()'
1784             : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()'
1785             : $tmp_col_map{$_} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME}
1786             : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_map->{$_}{COLUMN_NAME}
1787             : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME}
1788 0 0 0       : confess "Failed to map target column $table.$_"
    0 0        
    0 0        
    0          
    0          
1789 0           } @{$tbl_info->{LIST}};
  0            
1790 0           my $field_str = join(",", @fields);
1791              
1792 0   0       my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols);
1793 0   0       my $upd_col_str = join(",", map "$_=s.".($col_map{$_}||$_), @upd_cols);
1794              
1795             # Determine if last_chg_user, last_chg_date need to be updated
1796 0           my %chg_col = $self->last_chg_list($table, \@fields);
1797 0           for my $col ( sort { $b cmp $a } keys %chg_col ) {
  0            
1798 0 0         $upd_col_str .= ",$col=".( ($col eq 'last_chg_user') ? 'suser_name()' : 'getdate()');
1799             }
1800              
1801 0 0         unless ($args{InsertOnly}) {
1802 0           my $upd_merge_sql = sprintf($upd_sql,
1803             $table,
1804             $upd_col_str,
1805             $table, $stg_table,
1806             $key_col_str,
1807             );
1808 0           print("Executing: $upd_merge_sql\n");
1809              
1810 0 0         unless ($args{NoExec}) {
1811 0           my $upd_rows = $dbh->do($upd_merge_sql) + 0;
1812 0           print("$upd_rows rows updated in $table\n\n");
1813             }
1814             }
1815              
1816 0           my $del_merge_sql = sprintf($del_sql,
1817             $stg_table,
1818             $stg_table, $table,
1819             $key_col_str,
1820             );
1821 0           print("Executing: $del_merge_sql\n");
1822              
1823 0 0         unless ($args{NoExec}) {
1824 0           my $del_rows = $dbh->do($del_merge_sql) + 0;
1825 0           print("$del_rows rows deleted from $stg_table\n\n");
1826             }
1827              
1828 0           my $ins_merge_sql = sprintf($ins_sql,
1829             $field_str,
1830             $stg_table,
1831             );
1832 0           print("Inserting to $table: $ins_merge_sql\n");
1833              
1834 0 0         return 1 if $args{NoExec};
1835 0           my $ins_rows = $self->bcp_sql($table, $ins_merge_sql) + 0;
1836 0           print("$ins_rows rows inserted to $table\n\n");
1837              
1838 0           return 1;
1839             }
1840             }
1841              
1842             # BCP (via sqsh) the results of a sql select statement into a table
1843             sub bcp_sql {
1844 0     0     my $self = shift;
1845 0           my ($table,$sql) = @_;
1846              
1847 0           my $dbh = $self->{DBH};
1848 0           my $db = $dbh->{Name};
1849 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
1850 0           my $server = $1;
1851 0           my $database = $self->curr_db();
1852              
1853 0           my $user = $dbh->{Username};
1854 0 0 0       my $bcp_table =
    0          
    0          
1855             (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1856             : ($table =~ /^\w+$/) ? "$database..$table"
1857             : ($table =~ /^\w*\.\w+$/) ? "$database.$table"
1858             : confess "Can not determine database for sqsh/bcp";
1859              
1860 0           local $ENV{SQSH} = "-U $dbh->{Username} -P $self->{PASSWORD}";
1861 0           my $pid = open(my $fh, "-|");
1862 0 0         confess "Can't fork: $!" unless defined $pid;
1863 0 0         unless ($pid) {
1864              
1865             # sqsh needs library path set - make sure it is set
1866             # Don't know where it is in generic environment, or best way
1867             # to universally set this, or even if this is necessary in general...
1868             # local $ENV{LD_LIBRARY_PATH} = '/path/to/sybase/OCS-12_5/lib';
1869 0           my @cmd = (sqsh => -S => $server, -D => $database);
1870 0           my $sqsh_fh;
1871              
1872             # sqsh outputs to stderr
1873 0           open(STDERR, ">&STDOUT");
1874 0 0         unless ( open($sqsh_fh, "|-", @cmd) ) {
1875 0           warn "Unable to exec @cmd: $!";
1876 0           exit(1);
1877             }
1878 0           print $sqsh_fh "$sql\n";
1879 0           print $sqsh_fh "\\bcp -b 1000 $bcp_table\n";
1880              
1881 0           my $status = close $sqsh_fh;
1882 0 0         exit($status ? 0 : 1);
1883             }
1884 0           my $rows;
1885 0           local ($_, $.);
1886 0           my $cnt;
1887 0           while (<$fh>) {
1888 0 0         if (/^Batch successfully bulk-copied/) {
1889 0           $cnt += 1000;
1890 0 0         print "$cnt: $_" unless $cnt % 10_000;
1891 0           next;
1892             }
1893 0           print;
1894 0 0         $rows = $1 if /^\s*(\d+) rows copied/;
1895             }
1896 0           my $close_status = close $fh;
1897 0 0         confess "SQSH BCP error - no rows copied" unless defined $rows;
1898 0 0         confess "SQSH BCP error - $rows rows copied" unless $close_status;
1899              
1900             # Return true value
1901 0           return $rows;
1902             }
1903              
1904             # SQL to return table column defaults
1905             {
1906             my $sql = <
1907             SELECT c.name, d.text
1908             FROM dbo.syscolumns c, dbo.syscomments d
1909             WHERE c.id = object_id('%s')
1910             AND c.cdefault = d.id
1911             AND d.texttype = 0
1912             SQL
1913              
1914 0     0     sub default_sql { return $sql }
1915             }
1916              
1917             # Changed for Sybase v12 and multiple tempdbs
1918             sub temp_table_name {
1919 0     0     my ($self, $name) = @_;
1920              
1921 0           my $dbh = $self->{DBH};
1922 0           my ($spid) = $dbh->selectrow_array('select @@spid');
1923 0           print "SPid: $spid\n";
1924 0           my $who = $dbh->selectrow_hashref("exec sp_who '$spid'");
1925 0   0       my $tempdb = $who->{tempdbname} || 'tempdb';
1926 0           print "TempDb: $tempdb\n";
1927 0           my ($id) = $dbh->selectrow_array("select object_id('$tempdb..$name')");
1928 0           print "ID: $id\n";
1929 0           my ($real_name) = $dbh->selectrow_array("select object_name($id, db_id('$tempdb'))");
1930 0           print "RealName: $real_name\n";
1931 0           return "$tempdb..$real_name";
1932             }
1933              
1934             sub delete {
1935 0     0     my ($self, $table, $where, $limit) = @_;
1936              
1937 0           my $dbh = $self->{DBH};
1938 0   0       $dbh->{syb_rowcount} = $limit || 1000;
1939              
1940 0           my $sql = "DELETE FROM $table";
1941 0 0         $sql .= " WHERE $where" if $where;
1942              
1943 0           my ($rows, $tot_rows);
1944 0           my ($err, $err_msg);
1945              
1946 0           print "Executing: $sql\n";
1947 0           do {
1948              
1949 0           $rows = eval { $dbh->do($sql) };
  0            
1950 0 0         unless ($rows) {
1951 0           $err_msg = $@;
1952 0           $err++;
1953 0           $rows = 0;
1954             }
1955              
1956 0           $tot_rows += $rows;
1957 0 0         print "Deleted $tot_rows rows\n" if $rows > 0;
1958              
1959             } while $rows > 0;
1960              
1961 0           $dbh->{syb_rowcount} = 0;
1962              
1963 0 0         confess $err_msg if $err;
1964              
1965 0           print "$tot_rows rows deleted from $table\n";
1966 0           return $tot_rows;
1967             }
1968              
1969              
1970             package DBIx::BulkUtil::SybaseIQ;
1971              
1972 1     1   5 use Carp qw(confess);
  1         1  
  1         45  
1973 1     1   3 use Cwd qw(abs_path);
  1         1  
  1         805  
1974              
1975             our @ISA = qw(DBIx::BulkUtil::Sybase);
1976              
1977             {
1978              
1979             my $sql = <
1980             LOAD TABLE %s
1981             (
1982             %s
1983             )
1984             FROM
1985             %s
1986             QUOTES OFF
1987             ESCAPES OFF
1988             SQL
1989              
1990             sub bcp_in {
1991 0     0     my $self = shift;
1992 0           my $table = shift;
1993              
1994 0 0         my $opts = (ref $_[-1]) ? pop @_ : {};
1995              
1996 0           my @files = @_;
1997              
1998 0 0         push @files, "$table.bcp" unless @files;
1999              
2000 0           my $dbh = $self->{DBH};
2001              
2002 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
2003 0   0       my $row_delimiter = $opts->{RowDelimiter} || "\n";
2004              
2005 0           my $id_cnt;
2006 0   0       my $mode = $opts->{Action} || "A";
2007 0 0         if ( $mode eq 'T' ) {
    0          
2008 0           my $sql = "TRUNCATE TABLE $table";
2009 0           print "Executing: $sql\n";
2010 0           $dbh->do($sql);
2011             } elsif ($mode eq 'R') {
2012 0           my $sql = "DELETE FROM $table";
2013 0           print "Executing: $sql\n";
2014 0           $dbh->do($sql);
2015             }
2016              
2017 0           my @bcp_list;
2018 0           for my $file (@files) {
2019 0 0         confess "BCP file $file does not exist" unless -f $file;
2020 0 0         unless ( -s _ ) {
2021 0           print "$file is empty. Skipping ...\n";
2022 0           next;
2023             }
2024 0           push @bcp_list, $file;
2025             }
2026              
2027 0 0         unless ( @bcp_list ) {
2028 0           print "All files are empty. Skipping bcp of $table\n";
2029              
2030             # Make any log file parsers happy
2031 0           print "0 rows copied\n";
2032 0           return 0;
2033             }
2034              
2035 0           my $info = $self->column_info($table);
2036 0 0 0       my $col_list = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? $opts->{ColumnList} : $info->{LIST};
2037 0 0         my @filler = $opts->{Filler} ? @{$opts->{Filler}} : ();
  0            
2038 0           my %is_filler;
2039 0           $is_filler{$_}++ for @filler;
2040              
2041             # Convert empty string to NULL
2042             # Should be default but we don't want to break existing apps
2043 0 0         my $null_blanks = $self->{NoBlankNull} ? ' NULL(BLANKS)' : '';
2044              
2045             # Columns that we will let default to the schema default
2046 0   0       my $dflt = $opts->{Default} || [];
2047 0           my %dflt; $dflt{$_}++ for @$dflt;
  0            
2048              
2049 0   0       my $constant = $opts->{Constants} || {};
2050              
2051 0   0       my @list = grep !defined($constant->{$_})&&!$dflt{$_}, @$col_list;
2052 0           my $last_col = $list[-1];
2053              
2054             # It is best to explicitly put the row delimiter on the last column
2055             my $load_sql = sprintf(
2056             $sql,
2057             $table,
2058             join( ",\n", map {
2059             defined($constant->{$_}) ? qq( [$_] DEFAULT '$constant->{$_}')
2060             : ( $_ ne $last_col )
2061             ? $is_filler{$_} ? qq( FILLER('$delimiter')) : qq( [$_] '$delimiter'$null_blanks)
2062             : ( $opts->{TrailingDelimiter} )
2063             ? $is_filler{$_} ? qq( FILLER('$delimiter$row_delimiter')) : qq( [$_] '$delimiter$row_delimiter'$null_blanks)
2064 0 0         : $is_filler{$_} ? qq( FILLER('$row_delimiter')) : qq( [$_] '$row_delimiter'$null_blanks)
    0          
    0          
    0          
    0          
    0          
2065             } grep !$dflt{$_}, @$col_list),
2066 0           join( ",\n ", map { "'". abs_path($_) . "'" } @bcp_list),
  0            
2067             );
2068              
2069 0 0         $load_sql .= "SKIP $opts->{Header}\n" if $opts->{Header};
2070              
2071             # '0' indicates unlimited errors to IQ, but will be skipped here since '0' is false
2072             # That's okay, '00' might work (it is 'true' and == 0).
2073 0 0         $load_sql .= "IGNORE CONSTRAINT ALL $opts->{MaxErrors}\n" if $opts->{MaxErrors};
2074              
2075 0           my $db = $dbh->{Name};
2076 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
2077 0           my $server = $1;
2078 0           my $database = $self->curr_db();
2079              
2080 0           print "Loading $server/$database/$table\n";
2081 0           print "Executing: $load_sql\n";
2082 0           my $rows = $dbh->do($load_sql) + 0;
2083 0           print "$rows rows copied\n";
2084 0           return $rows;
2085             }
2086             }
2087              
2088             {
2089             my $sql = <
2090             SELECT cname, default_value
2091             FROM sys.syscolumns
2092             WHERE tname = '%s'
2093             AND default_value IS NOT NULL
2094             SQL
2095              
2096 0     0     sub default_sql { return $sql }
2097             }
2098              
2099             # Because SybaseIQ can not do sqsh
2100             sub bcp_sql {
2101 0     0     my ($self, $table, $sql) = @_;
2102 0           my $do_sql = "INSERT INTO $table\n$sql";
2103 0           $self->{DBH}->do("INSERT INTO $table\n$sql");
2104             }
2105              
2106 0     0     sub is_iq {1}
2107              
2108             sub index_info {
2109 0     0     my ( $self, $table, $all_indexes ) = @_;
2110              
2111 0           my $dbh = $self->{DBH};
2112              
2113 0           my $sql = "exec sp_iqindex [$table]";
2114 0           my $sth = $dbh->prepare($sql);
2115 0           $sth->execute();
2116 0           my @col_names = @{$sth->{NAME_lc}};
  0            
2117 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
2118 0           my %ind;
2119 0           while ($sth->fetch()) {
2120 0 0 0       next if !$all_indexes and $row{unique_index} ne 'Y';
2121 0           $ind{$row{index_name}} = [ split /,/, $row{column_name} ];
2122             }
2123              
2124 0 0         return unless %ind;
2125 0           return \%ind;
2126             }
2127              
2128             package DBIx::BulkUtil::Oracle;
2129              
2130 1     1   12 use Carp qw(confess);
  1         1  
  1         37  
2131 1     1   3 use Cwd qw(abs_path);
  1         1  
  1         4959  
2132              
2133             our @ISA = qw(DBIx::BulkUtil::Obj);
2134              
2135 0     0     sub now { 'systimestamp' }
2136              
2137             sub add {
2138 0     0     my $self = shift;
2139 0           my $date = shift;
2140 0           while (my ( $n, $unit ) = splice( @_, 0, 2 ) ) {
2141 0           $date .= " + numtodsinterval( $n, '$unit' )";
2142             }
2143 0           return $date;
2144             }
2145              
2146             {
2147             my %intervals = (
2148             year => '/ 365',
2149             month => '/ 30',
2150             hour => '* 24',
2151             minute => '* 24 * 60',
2152             second => '* 24 * 60 * 60',
2153             );
2154              
2155             sub diff {
2156 0     0     my $self = shift;
2157 0           my $date1 = shift;
2158 0           my $date2 = shift;
2159 0           my $unit = shift;
2160 0           my $diff_str = "$date2 - $date1";
2161 0 0         if (my $str = $intervals{$unit}) {
2162 0           $diff_str = "($diff_str) $str";
2163             }
2164 0           return "trunc($diff_str)";
2165             }
2166             }
2167              
2168             # This is necessary when you want to use a literal
2169             # date in a datetime calculation
2170             sub to_datetime {
2171 0     0     my $self = shift;
2172 0           my $date = shift;
2173              
2174 0           return "to_timestamp('$date', 'YYYY-MM-DD HH24:MI:SS.FF')";
2175             }
2176              
2177             # Don't need this with new version of DBI/DBD
2178             #sub to_char {
2179             # my $self = shift;
2180             # my $date = shift;
2181             # return "to_char($date, 'YYYY-MM-DD HH24:MI:SS')";
2182             #}
2183             #
2184             #sub fmt { return $_[1] }
2185              
2186             sub row_select {
2187 0     0     my $self = shift;
2188 0           my $sel = shift;
2189 0           return "select $sel from dual";
2190             }
2191              
2192             sub sp_sth {
2193 0     0     my $self = shift;
2194 0           my $sth = $self->{DBH}->prepare($self->sp_sql(@_));
2195 0           $sth->bind_param_inout(":cursor", \my $sth2, 0, { ora_type => DBD::Oracle::ORA_RSET() });
2196 0           $sth->execute();
2197 0           return $sth2;
2198             }
2199              
2200             sub sp_sql {
2201 0     0     my $self = shift;
2202 0           my ($stored_proc, @args) = @_;
2203             return
2204             "BEGIN\n$stored_proc(" .
2205 0 0         join(",", map { /^:cursor$/ ? $_ : $self->{DBH}->quote($_) } @args) .
  0            
2206             ");\nEND;\n";
2207             }
2208              
2209             {
2210              
2211             my %action_map = (
2212             A => "APPEND",
2213             R => "REPLACE",
2214             T => "TRUNCATE",
2215             );
2216              
2217             sub bcp_in {
2218 0     0     my $self = shift;
2219 0           my $opts = {};
2220 0 0         if (ref $_[-1]) {
2221 0           $opts = pop @_;
2222             }
2223 0   0       my $action_opt = uc($opts->{Action} || "A");
2224              
2225 0           my ( $table, @files ) = @_;
2226              
2227 0 0         my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';
2228              
2229 0           my $dbh = $self->{DBH};
2230              
2231 0           my $stdin = $opts->{Stdin};
2232 0 0 0       @files = "$table.bcp" if !@files && !$stdin;
2233              
2234 0           my $has_stdin;
2235 0           for my $file (@files) {
2236 0 0         if ( $file eq "-" ) {
2237 0           $has_stdin++;
2238 0           next;
2239             }
2240 0 0         confess "BCP file $file does not exist" unless -f $file;
2241             }
2242              
2243 0 0 0       if ( $has_stdin && !$stdin ) {
    0 0        
2244 0           $stdin = \*STDIN;
2245             } elsif ( $stdin && !$has_stdin ) {
2246 0           push @files, "-";
2247             }
2248              
2249             # Save some work, skip load on empty file
2250             # Let sqlldr do a heavy handed truncate or delete
2251             # if that is the chosen action
2252 0 0         my @bcp_files = grep { $_ eq "-" or -s } @files;
  0            
2253              
2254 0 0         if ( !@bcp_files ) {
2255 0 0         if ( $action_opt eq 'A') {
2256 0           print "$files[0],... is empty. Skipping sqlldr\n";
2257              
2258             # Make any log file parsers happy
2259 0           print "0 Rows successfully loaded\n";
2260 0           return 0;
2261             }
2262              
2263             # Need some files if we run sqlldr
2264 0           @bcp_files = @files;
2265             }
2266 0           require File::Temp;
2267              
2268 0   0       my $constants = $opts->{Constants} || {};
2269 0           my %const = map { uc($_) => $constants->{$_} } keys %$constants;
  0            
2270              
2271 0   0       my $sizes = $opts->{CharSizes} || {};
2272 0           my %char_sizes = map { uc($_) => $sizes->{$_} } keys %$sizes;
  0            
2273              
2274 0   0       my $keep_temp = $opts->{KeepTempFiles} || $opts->{Debug};
2275 0   0       my $in_temp_dir = $opts->{TempDir} || $opts->{Debug};
2276 0           my $temp_dir;
2277 0 0 0       $temp_dir = $opts->{TempDir} || "." if $in_temp_dir;
2278              
2279 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
2280 0 0         my @unlink = $keep_temp ? (UNLINK => 0) : ();
2281 0           my $ctl_fh = File::Temp->new(
2282             TEMPLATE => "${table}_XXXXX",
2283             SUFFIX => ".ctl",
2284             @temp_dir, @unlink,
2285             );
2286 0           chmod(0664, $ctl_fh->filename());
2287 0           my $bad_fh = File::Temp->new(
2288             TEMPLATE => "${table}_XXXXX",
2289             SUFFIX => ".bad",
2290             @temp_dir, @unlink,
2291             );
2292 0           chmod(0664, $bad_fh->filename());
2293 0           my $log_fh = File::Temp->new(
2294             TEMPLATE => "${table}_XXXXX",
2295             SUFFIX => ".log",
2296             @temp_dir, @unlink,
2297             );
2298 0           chmod(0664, $log_fh->filename());
2299 0 0         my $prm_fh = $stdin ? File::Temp->new(
2300             TEMPLATE => "${table}_XXXXX",
2301             SUFFIX => ".prm",
2302             @temp_dir,
2303             ) : undef;
2304              
2305             # NLS date format env variable does not work
2306             # for sqlldr.
2307             # So we must determine date fields and
2308             # specify the format in the control file.
2309 0           my $db = $self->{DBH}->{Name};
2310 0           my $user = $dbh->{Username};
2311 0           my ($schema, $tbl_name) = split /\./, uc($table);
2312 0 0         if (!$tbl_name) {
2313 0           $tbl_name = $schema;
2314 0           $schema = $self->curr_schema();
2315             }
2316              
2317 0           my $sth = $dbh->column_info(undef, $schema, $tbl_name, undef);
2318 0           my @info_names = @{$sth->{NAME_uc}};
  0            
2319 0           my %row; $sth->bind_columns(\@row{@info_names});
  0            
2320 0           my (@columns, %is_date, %char_sz, %is_lob);
2321 0 0         print "ColumnName Type Size\n" if $opts->{Debug};
2322 0 0         print "----------------\n" if $opts->{Debug};
2323 0           while ($sth->fetch()) {
2324 0           push @columns, $row{COLUMN_NAME};
2325 0 0         print "$row{COLUMN_NAME}\t$row{TYPE_NAME}\t$row{COLUMN_SIZE}\n" if $opts->{Debug};
2326 0 0         $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : $row{COLUMN_SIZE} if $row{TYPE_NAME} =~ /CHAR/;
    0          
2327 0 0         $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : 20_000_000, $is_lob{$row{COLUMN_NAME}} = 1 if $row{TYPE_NAME} =~ /TEXT|LOB|XML/;
    0          
2328 0 0         $is_date{$row{COLUMN_NAME}} = $1 if $row{TYPE_NAME} =~ /(DATE|TIMESTAMP)/;
2329             }
2330 0 0         confess("Table $schema.$tbl_name not found in database $db") unless @columns;
2331              
2332             # Find date formats in file, remove constants from column list
2333 0           my %date_fmt;
2334             my @file_columns = grep !defined($const{$_}),
2335 0 0 0       ( ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? ( map uc, @{$opts->{ColumnList}} ) : @columns );
  0            
2336 0 0         if (%is_date) {
2337             # We don't want to sample rows from stdin
2338 0           my @real_files = grep { $_ ne "-" } @files;
  0            
2339 0 0         %date_fmt = $self->date_masks_from_file( \@real_files, \@file_columns, \%is_date, $opts)
2340             if @real_files;
2341             }
2342              
2343 0 0         my $row_delim_str = $opts->{RowDelimiter} ? qq("str '$opts->{RowDelimiter}'"\n) : '';
2344              
2345 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
2346 0   0       my $action = $action_map{$action_opt} || "APPEND";
2347 0           my $direct_load_pre = '';
2348 0           my $direct_load_post = '';
2349              
2350 0           my $sqlldr_opts = '';
2351 0   0       my $max_errors = $opts->{MaxErrors} || 0;
2352 0           $sqlldr_opts .= "ERRORS=$max_errors";
2353 0 0         $sqlldr_opts .= ", SKIP=$opts->{Header}" if $opts->{Header};
2354              
2355 0 0         if ($opts->{DirectPath}) {
2356 0 0         my $parallel = ( uc($opts->{DirectPath}) eq 'P' ) ? ", PARALLEL=TRUE" : '';
2357 0           $direct_load_pre = "OPTIONS(DIRECT=TRUE$parallel, ROWS=1000000, $sqlldr_opts)\nUNRECOVERABLE\n";
2358 0           $direct_load_post = "REENABLE DISABLED_CONSTRAINTS\n";
2359             } else {
2360 0   0       my $commit_rows = $opts->{CommitSize} || 2000;
2361 0           $direct_load_pre = "OPTIONS (ROWS=$commit_rows, BINDSIZE=5000000, READSIZE=20970000, $sqlldr_opts)\n";
2362             }
2363             my $default_date_fmt =
2364             $opts->{SybaseDateFmt} ? 'MON DD YYYY HH12:MI:SS:FF3AM'
2365             : $opts->{DateFormat} ? $opts->{DateFormat}
2366 0 0         : 'YYYY-MM-DD HH24:MI:SS.FF3'
    0          
2367             ;
2368 0           for ( keys %is_date ) {
2369 0   0       $date_fmt{$_} ||= $default_date_fmt;
2370 0 0         $is_date{$_} = 'TIMESTAMP' if $date_fmt{$_} =~ /FF|TZ[DHMR]/;
2371             }
2372             my $quote_str = $opts->{QuoteFields}
2373 0 0         ? qq( OPTIONALLY ENCLOSED BY '"')
2374             : ''
2375             ;
2376 0 0         if ( $opts->{LoadWhen} ) {
2377 0           $direct_load_post .= "WHEN $opts->{LoadWhen}\n";
2378             }
2379              
2380 0           my $nls_str = '';
2381 0 0         $nls_str = "CHARACTERSET $opts->{NLSLang}" if $opts->{NLSLang};
2382 0 0         $nls_str .= " LENGTH SEMANTICS $opts->{Semantics}" if $opts->{Semantics};
2383 0 0         $nls_str .= "\n" if $nls_str;
2384              
2385 0           my %sybase_type;
2386 0 0         @sybase_type{@file_columns} = @{$opts->{SybaseTypes}} if $opts->{SybaseTypes};
  0            
2387             # Logic for trimming or preserving blanks on char/varchar columns
2388             my $blank_control = sub {
2389 0     0     my $size = $char_sz{$_};
2390             return " $_ CHAR($size) PRESERVE BLANKS"
2391 0 0 0       if $opts->{PreserveBlanks} or $size == 1;
2392 0 0         if ( $opts->{SybaseTypes} ) {
2393             # On the off chance a Sybase char column becomes an Oracle BLOB
2394 0 0         return qq[ $_ CHAR($size)] if $is_lob{$_};
2395 0 0         return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $sybase_type{$_} eq 'char';
2396             } else {
2397 0 0         return qq[ $_ CHAR($size)] if $is_lob{$_};
2398 0 0         return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $opts->{TrimBlanks};
2399             }
2400 0           return " $_ CHAR($size)";
2401 0           };
2402              
2403 0   0       my $field_ref = $opts->{FieldRef} || {};
2404             my %field_ref = map {
2405 0           my $col = $_;
  0            
2406 0           my $tmp = $field_ref->{$col};
2407 0 0         my $v = ( $tmp =~ s/^~// ) ? "POSITION $tmp" : qq("$tmp");
2408 0           uc($col) => $v;
2409             } keys %$field_ref;
2410              
2411             # Field ref columns that don't reference themselves
2412             # will be considered similar to constant columns, but they must come
2413             # last, otherwise column alignment will be off
2414 0           my %field_ref_const;
2415 0           for ( keys %field_ref ) {
2416 0 0         next if $field_ref{$_} =~ /:$_\b/i;
2417 0           $field_ref_const{$_}++;
2418             }
2419 0           @columns = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? map uc($_), @{$opts->{ColumnList}} : (
2420 0 0 0       (grep !$field_ref_const{$_}, @columns),
2421             keys %field_ref_const,
2422             );
2423 0           my %is_filler;
2424 0 0         if ( $opts->{Filler} ) {
2425 0           $is_filler{uc($_)}++ for @{$opts->{Filler}};
  0            
2426             }
2427              
2428 0           my $file_str = join(",", @bcp_files);
2429 0           my $sqlldr_file_str = join("\n", map "INFILE '$_'", @bcp_files);
2430 0           my $disp_table = my $sqlldr_table = $table;
2431              
2432 0 0         if ($partition) {
2433 0           $sqlldr_table .= " PARTITION ($partition)";
2434 0           $disp_table .= ":$partition";
2435             }
2436              
2437             # Default charset is roman8 on HP
2438             # Must set it here
2439             printf $ctl_fh
2440             $direct_load_pre.
2441             "LOAD DATA\n".
2442             #"CHARACTERSET WE8ROMAN8\n".
2443             $nls_str.
2444             "%s\n".
2445             $row_delim_str.
2446             "INTO TABLE %s %s\n".
2447             $direct_load_post.
2448             qq(FIELDS TERMINATED BY '$delimiter'$quote_str\n).
2449             "TRAILING NULLCOLS\n".
2450             "(\n%s\n)\n",
2451             $sqlldr_file_str,
2452             $sqlldr_table,
2453             $action,
2454             join(",\n", map {
2455 0           (
2456             exists($const{$_}) ? qq[ $_ CONSTANT '$const{$_}']
2457             : exists($is_filler{$_}) ? qq[ $_ FILLER]
2458             : exists($field_ref{$_}) ? qq[ $_ $field_ref{$_}]
2459             : $is_date{$_} ? " $_ $is_date{$_} '$date_fmt{$_}'"
2460 0 0         : $char_sz{$_} ? $blank_control->()
    0          
    0          
    0          
    0          
2461             : " $_"
2462             )
2463             } @columns);
2464 0 0         if ($prm_fh) {
2465 0           print $prm_fh "userid=$user/$self->{PASSWORD}\@$db\n";
2466 0           close $prm_fh;
2467             }
2468              
2469 0           $_->close() for $ctl_fh, $bad_fh, $log_fh;
2470              
2471 0           print "Loading $db..$disp_table from $file_str\n";
2472              
2473 0           my $ctl_file = $ctl_fh->filename();
2474 0           my $bad_file = $bad_fh->filename();
2475 0           my $log_file = $log_fh->filename();
2476 0 0         my $prm_file = $prm_fh ? $prm_fh->filename() : undef;
2477 0 0         if ($keep_temp) {
2478 0           print "SqlldrControlFile: ", abs_path($ctl_file), "\n";
2479 0           print "SqlldrBadRowFile : ", abs_path($bad_file), "\n";
2480 0           print "SqlldrLogFile : ", abs_path($log_file), "\n";
2481             }
2482 0           local $ENV{NLS_DATE_FORMAT} = 'YYYY-MM-DD HH24:MI:SS';
2483 0           local $ENV{NLS_TIMESTAMP_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF';
2484 0           local $ENV{NLS_TIMESTAMP_TZ_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF';
2485              
2486 0           my @prm_opt;
2487 0 0         @prm_opt = "parfile=$prm_file" if $prm_file;
2488 0           my @cmd = (
2489             sqlldr =>
2490             "control=$ctl_file",
2491             "log=$log_file",
2492             "bad=$bad_file",
2493             @prm_opt,
2494             "silent=(header,discards,feedback,partitions)",
2495             );
2496 0 0 0       print "Executing: @cmd\n" if $opts->{Debug} || $opts->{NoExec};
2497 0 0         return "@cmd" if $opts->{NoExec};
2498              
2499 0           my $close_success;
2500              
2501             # We could do this either way with IPC::Run
2502             # But lets not require it unless necessary.
2503 0 0         if ($stdin) {
2504 0           require IPC::Run;
2505              
2506 0           $close_success = IPC::Run::run( \@cmd, '<', $stdin );
2507             } else {
2508             # Hide user/passwd from ps
2509 0 0         open(my $cmd_fh, "|-", @cmd) or confess "Could not exec sqlldr: $!";
2510 0           print $cmd_fh "$user/$self->{PASSWORD}\@$db\n";
2511              
2512             # We don't want to exit right away on failure
2513             # We want to see the log file and bad record if any
2514 0           $close_success = close $cmd_fh;
2515             }
2516              
2517             # We don't want to exit right away on failure
2518             # We want to see the log file and bad record if any
2519 0           my $exit_stat = $? >> 8;
2520 0           my $exit_sig = $? & 127;
2521 0           my $exit_core = $? & 128;
2522              
2523             # We have a limit of one rejected row. If we have a bad row
2524             # we'll just include it in the error.
2525             # Oops thats no longer true now that we have a MaxErrors option
2526             # Just show the first bad row if we allow > 1 error
2527 0           my $bad_row;
2528 0 0         if ( -s $bad_file ) {
2529 0 0         if ( $max_errors > 0 ) {
2530 0           local ($_, $.);
2531 0   0       local $/ = $opts->{RowDelimiter} || "\n";
2532 0 0         open(my $fh, "<", $bad_file) or confess "Can't open sqlldr reject file $bad_file: $!";
2533 0           $bad_row = <$fh>;
2534 0           close $fh;
2535             } else {
2536 0           warn "sqlldr error loading $file_str into $disp_table on row:\n";
2537 0           $bad_row = `cat $bad_file`;
2538             }
2539             }
2540 0 0         open(my $fh, "<", $log_file) or confess "Can't open sqlldr log $log_file: $!";
2541 0           print "Opened $log_file\n";
2542 0           local ($_, $.);
2543 0           my ( $rows, $error_rows, $failed_rows, $null_rows, $error_msg, $discontinued, $dp_errors );
2544              
2545             # Only save first 1000 errors
2546 0           my $err_cnt = 0;
2547 0           while (<$fh>) {
2548 0           print;
2549 0 0         if ( /^\s*(\d+)/ ) {
2550 0           my $tmp_rows = $1;
2551 0 0         $rows = $tmp_rows if /successfully loaded/;
2552 0 0         $error_rows = $tmp_rows if /not loaded due to data errors/;
2553 0 0         $failed_rows = $tmp_rows if /not loaded because all WHEN clauses/;
2554 0 0         $null_rows = $tmp_rows if /not loaded because all fields were null/;
2555 0           next;
2556             }
2557 0 0         if ( /^Record \d+: Rejected/ ) {
2558 0 0         $error_msg .= $_ if $err_cnt < 1000;
2559 0           next;
2560             }
2561 0 0         if ( /^(?:SQL\*Loader|ORA)-\d+:/ ) {
2562 0 0         $error_msg .= $_ if ++$err_cnt <= 1000;
2563 0 0         $discontinued++ if /discontinued|aborted/;
2564 0           next;
2565             }
2566              
2567             # Catch direct path errors
2568 0 0         if ( /was not re-(?:enabled|validated)/ ) {
2569             # These errors do not cause non-zero exit status
2570 0           $dp_errors++;
2571 0 0         $error_msg .= $_ if $err_cnt < 1000;
2572 0           next;
2573             }
2574 0 0         if ( /^index \S+ was made unusable/ ) {
2575 0           $dp_errors++;
2576 0 0         $error_msg .= $_ if ++$err_cnt <= 1000;
2577 0           next;
2578             }
2579              
2580             }
2581 0           close $fh;
2582              
2583 0 0 0       if (!$close_success or $dp_errors) {
2584 0   0       $error_msg ||= '';
2585 0 0 0       if ( $exit_stat != 0 or $dp_errors ) {
2586 0 0 0       if ( $exit_stat == 2 or $dp_errors ) {
2587             # Exit status 2 is just a warning
2588             # But we should consider it an error if we exceeded the max errors allowed
2589             # Or if load was discontinued for any reason
2590             # Or for any direct path errors
2591 0 0         my $first = ($max_errors > 0) ? 'first ' : '';
2592 0 0         confess "sqlldr exited with status $exit_stat [$error_msg]" if $dp_errors;
2593 0 0         confess "sqlldr exited with status $exit_stat [$error_msg] - ${first}rejected record:[$bad_row]" if $error_rows > $max_errors;
2594 0 0         confess "sqlldr exited with status $exit_stat [$error_msg]" if $discontinued;
2595             } else {
2596 0           confess "sqlldr exited with status $exit_stat [$error_msg]";
2597             }
2598             }
2599 0 0         confess "sqlldr received signal $exit_sig [$error_msg]" if $exit_sig > 0;
2600 0 0         confess "sqlldr coredumped [$error_msg]" if $exit_core;
2601             }
2602 0           return $rows;
2603             }
2604             }
2605              
2606             # Dummy method for compatibility with Sybase
2607       0     sub mk_view { }
2608              
2609             sub date_masks_from_file {
2610 0     0     my $self = shift;
2611 0           my ($files, $columns, $is_date, $opts) = @_;
2612              
2613 0 0 0       return unless $is_date and %$is_date;
2614              
2615 0   0       $opts ||= {};
2616              
2617 0   0       my $sample_rows = $opts->{DateSampleRows} || 1000;
2618 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
2619 0           my $rd = $opts->{RowDelimiter};
2620 0   0       my $year_mask = $opts->{Year2Mask} || 'YY';
2621              
2622 0           local ($., $_, $ARGV, *ARGV);
2623 0 0         local $/ = $rd if $rd;
2624 0           local @ARGV = @$files;
2625              
2626 0           my $row_cnt;
2627 0           my (%remaining, %got);
2628 0           $remaining{$_}++ for keys %$is_date;
2629              
2630 0           my %fmt;
2631 0   0       my $dc_fmt = $opts->{DateColumnFmt} || {};
2632 0           for my $col ( keys %$dc_fmt ) {
2633 0           my $c = uc($col);
2634 0           $fmt{$c} = $dc_fmt->{$col};
2635 0           delete $remaining{$c}
2636             }
2637              
2638 0           my %row;
2639 0           while (<>) {
2640 0 0 0       next if $opts->{Header} and $. <= $opts->{Header};
2641 0           chomp;
2642 0 0         @row{@$columns} = $opts->{QuoteFields} ? split_quoted( $_, $d ) : split /\Q$d/;
2643 0           for (keys %remaining) {
2644 0 0         if ( $row{$_} ) {
2645 0           delete $remaining{$_};
2646 0           $got{$_} = $row{$_};
2647 0 0         last if !%remaining;
2648             }
2649             }
2650              
2651             # If we haven't found values by now, give up
2652 0 0         last if ++$row_cnt >= $sample_rows;
2653             }
2654              
2655 0           $fmt{$_} = $self->date_mask($got{$_}, $year_mask) for keys %got;
2656              
2657 0           return %fmt;
2658             }
2659              
2660             # If we allow quoted fields, need to split correctly and
2661             # handle embedded quotes and delimiters
2662             sub split_quoted {
2663 0     0     my ($line,$d) = @_;
2664 0           my @result;
2665 0           while ( $line =~ s/\A("?)((?:""|.)*?)\1(\Q$d\E|\z)//s ) {
2666 0           my ( $q, $s,$got_d ) = ( $1, $2, $3 );
2667 0 0         $s =~ s/""/"/g if $q;
2668 0           push @result, $s;
2669 0 0         last if length($got_d) == 0;
2670             }
2671 0           return @result;
2672             }
2673              
2674             {
2675              
2676             my @mon = qw( Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec );
2677             my $mon_str = join("|", @mon);
2678             my $mon_re = qr/(?i)$mon_str/;
2679             my @months = qw( January February March April May June July August September October November December );
2680             my $month_str = join("|", @months);
2681             my $month_re = qr/(?i)$month_str/;
2682             my @days = qw( Mon Tue Wed Thu Fri Sat Sun );
2683             my $day_str = join("|", @days);
2684             my $day_re = qr/(?i)$day_str/;
2685              
2686             sub date_mask {
2687 0     0     my ($self, $str, $year2mask) = @_;
2688 0 0         return unless $str;
2689 0           local $_ = $str;
2690              
2691 0           my $fmt = '';
2692 0   0       $year2mask ||= 'YY';
2693              
2694             # YYYY-MM-DD or YYYYMMDD
2695 0 0         if ( s/^\d{4}(\D?)\d\d(\D?)\d\d// ) {
2696 0           $fmt .= "YYYY${1}MM${2}DD";
2697 0           $fmt .= time_mask();
2698             #die "Can not determine date mask for $str ($fmt)" if length($_);
2699 0 0         return if length($_);
2700 0           return $fmt;
2701             }
2702              
2703             # Allow day abbreviation (Mon Tue etc.)
2704 0 0         $fmt .= "DY " if s/^$day_re\s+//;
2705              
2706             # Jan 23 2010
2707 0 0         if ( s/^$mon_re\s+\d+// ) {
2708 0           my $end_year;
2709 0           $fmt .= "MON DD";
2710 0 0         if ( s/^\s\d{4}// ) {
    0          
2711 0           $fmt .= " YYYY";
2712             } elsif ( s/\s+\d{4}$// ) {
2713 0           $end_year++;
2714             } else {
2715             #die "Can not determine date mask for $str ($fmt)";
2716 0           return;
2717             }
2718 0           $fmt .= time_mask();
2719              
2720             #die "Can not determine date mask for $str ($fmt)" if length($_);
2721 0 0         return if length($_);
2722 0 0         $fmt .= " YYYY" if $end_year;
2723 0           return $fmt;
2724             }
2725              
2726             # January 23, 2010
2727 0 0         if ( s/^$month_re\s+\d+// ) {
2728 0           my $end_year;
2729 0           $fmt .= "MONTH DD";
2730 0 0         if ( s/^(\W?)\s\d{4}// ) {
    0          
2731 0           my $comma = $1;
2732 0           $fmt .= "$comma YYYY";
2733             } elsif ( s/\s+\d{4}$// ) {
2734 0           $end_year++;
2735             } else {
2736             #die "Can not determine date mask for $str ($fmt)";
2737 0           return;
2738             }
2739 0           $fmt .= time_mask();
2740              
2741             #die "Can not determine date mask for $str ($fmt)" if length($_);
2742 0 0         return if length($_);
2743 0 0         $fmt .= " YYYY" if $end_year;
2744 0           return $fmt;
2745             }
2746              
2747             # 02-Jan-2010
2748 0 0         if ( s/^\d\d?(\D?)$mon_re(\D?)\d{4}// ) {
2749 0           $fmt .= "DD${1}MON${2}YYYY";
2750 0           $fmt .= time_mask();
2751             #die "Can not determine date mask for $str ($fmt)" if length($_);
2752 0 0         return if length($_);
2753 0           return $fmt;
2754             }
2755              
2756             # 02-Jan-10
2757 0 0         if ( s/^\d\d?(\D?)$mon_re(\D?)\d\d?// ) {
2758 0           $fmt .= "DD${1}MON${2}$year2mask";
2759 0           $fmt .= time_mask();
2760             #die "Can not determine date mask for $str ($fmt)" if length($_);
2761 0 0         return if length($_);
2762 0           return $fmt;
2763             }
2764              
2765             # MM/DD/YYYY
2766 0 0         if ( s|^\d\d?(\D)\d\d?(\D)\d{4}|| ) {
2767 0           $fmt .= "MM${1}DD${2}YYYY";
2768 0           $fmt .= time_mask();
2769             #die "Can not determine date mask for $str ($fmt)" if length($_);
2770 0 0         return if length($_);
2771 0           return $fmt;
2772             }
2773              
2774             #die "Failure to determine date mask for $str";
2775 0           return;
2776             }
2777             }
2778              
2779             # Operates on and modifies current $_
2780             sub time_mask {
2781 0     0     my $fmt = '';
2782 0 0         if ( s/^(\D?)[\s\d]\d// ) {
2783 0           my $sep = $1;
2784 0 0         $sep = qq("$sep") if $sep =~ /\S/;
2785 0           $fmt .= "${sep}HH";
2786 0 0         $fmt .= /[AP]M\b/i ? "12" : "24";
2787 0 0         if ( s/^(\D)\d\d// ) {
2788 0           $fmt .= "${1}MI";
2789 0 0         if ( s/^(\D)\d\d// ) {
2790 0           $fmt .= "${1}SS";
2791 0 0         if ( s/^(\D)(\d+)// ) {
2792 0           $fmt .= $1 . "FF" . length($2);
2793             }
2794             }
2795             }
2796 0 0         if ( s/^(\s?)[AP]M// ) {
2797 0           $fmt .= "${1}AM";
2798             }
2799 0 0         if ( s/^(\s*)\w{2,3}T//i ) {
2800 0           $fmt .= "${1}TZD";
2801             }
2802 0 0         if ( s/^\s[+-]\d\d(\D)\d\d// ) {
2803 0           $fmt .= " TZH${1}TZM";
2804             }
2805             }
2806 0           return $fmt;
2807             }
2808              
2809             {
2810             my %type_map = ( TABLE => 'T', VIEW => 'V', PROCEDURE => 'P' );
2811              
2812             sub obj_type {
2813 0     0     my ( $self, $name ) = @_;
2814 0           $name = uc($name);
2815 0           my $type;
2816 0 0         if ( $name =~ /^([^.]+)\.(.+)/ ) {
2817 0           my ($schema, $table) = ($1, $2);
2818             $type = $self->{DBH}->selectrow_array(
2819 0           "select object_type from all_objects where owner = ? and object_name = ?",
2820             undef,
2821             $schema,
2822             $table,
2823             );
2824             } else {
2825             $type = $self->{DBH}->selectrow_array(
2826 0           "select object_type from user_objects where object_name = ?",
2827             undef,
2828             $name
2829             );
2830             }
2831 0 0         return unless $type;
2832 0   0       return $type_map{$type} || confess "Don't know about type $type for object $name";
2833             }
2834             }
2835              
2836             sub curr_schema {
2837 0     0     my $self = shift;
2838 0           return $self->get("sys_context('USERENV', 'SESSION_SCHEMA')");
2839             }
2840              
2841             {
2842             my $sql_t = <
2843             SELECT
2844             b.index_name,
2845             b.column_name
2846             FROM all_indexes a, all_ind_columns b
2847             WHERE a.owner = b.index_owner
2848             AND a.index_name = b.index_name
2849             AND a.table_owner = %s
2850             AND a.table_name = %s
2851             SQL
2852              
2853              
2854             sub index_info {
2855 0     0     my ( $self, $table, $all_indexes ) = @_;
2856              
2857 0           my $dbh = $self->{DBH};
2858 0           my ( $schema, $tbl ) = split /\./, uc($table);
2859 0 0         if ( !$tbl ) {
2860 0           $tbl = $schema;
2861 0           $schema = $self->curr_schema();
2862             }
2863 0           my $sql = sprintf $sql_t, $dbh->quote($schema), $dbh->quote($tbl);
2864 0 0         $sql .= "and a.uniqueness = 'UNIQUE'\n" unless $all_indexes;
2865 0           $sql .= "ORDER BY b.column_position\n";
2866 0           my $sth = $dbh->prepare($sql);
2867 0           $sth->execute();
2868 0           my @col_names = @{$sth->{NAME_lc}};
  0            
2869 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
2870 0           my %ind;
2871 0           while ($sth->fetch()) {
2872 0           push @{$ind{$row{index_name}}}, lc($row{column_name});
  0            
2873             }
2874 0 0         return unless %ind;
2875 0           return \%ind;
2876             }
2877             }
2878              
2879             sub primary_key {
2880 0     0     my ($self, $table) = @_;
2881 0           $table = uc($table);
2882 0           my ($schema, $tbl) = split /\./, $table;
2883 0 0         if ( !$tbl ) {
2884 0           $tbl = $schema;
2885 0           $schema = $self->curr_schema();
2886             }
2887 0           my @pk = map lc, $self->{DBH}->primary_key(undef, $schema, $tbl);
2888 0 0         return unless @pk;
2889 0           return \@pk;
2890             }
2891              
2892             {
2893              
2894             my $sql = <
2895             MERGE %s INTO %s d
2896             USING %s s
2897             ON (%s)
2898             WHEN MATCHED THEN UPDATE SET %s
2899             WHEN NOT MATCHED THEN INSERT (%s)
2900             VALUES (%s)
2901             SQL
2902              
2903             sub merge {
2904 0     0     my $self = shift;
2905 0           my %args = @_;
2906              
2907 0           my $dbh = $self->{DBH};
2908 0           my $table = $args{Table};
2909 0           my $stg_table = $args{StgTable};
2910              
2911 0           my $stg_info = $self->column_info($stg_table);
2912 0           my $stg_map = $stg_info->{MAP};
2913 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
2914              
2915 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
2916 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table);
2917              
2918             # Normalize all columns and maps to lowercase
2919 0           my @key_cols = map lc, @$key_col_ref;
2920 0           my @upd_cols = map lc, @$upd_col_ref;
2921 0           my @fields = (@key_cols, @upd_cols);
2922             my %col_map = $args{ColMap}
2923 0 0         ? map lc, %{$args{ColMap}}
  0            
2924             : ();
2925              
2926             my $upd_col_str = join(",", map {
2927 0           $col_map{$_} ? $stg_has{$col_map{$_}} ? "d.$_=s.$col_map{$_}" : "d.$_=$col_map{$_}"
2928 0 0         : $stg_has{$_} ? "d.$_=s.$_" : ()
    0          
    0          
2929             } @upd_cols),
2930              
2931             # Determine if last_chg_user, last_chg_date need to be updated
2932             # If staging table does not have the columns, and the target table does
2933             # Then default the values
2934             my %chg_col = $self->last_chg_list($table, \@fields);
2935 0           delete $chg_col{$_} for grep $stg_has{$_}, qw(last_chg_user last_chg_date);
2936 0           for my $col ( sort { $b cmp $a } keys %chg_col ) {
  0            
2937             $upd_col_str .= ",$col=".( ($col eq 'last_chg_user')
2938 0 0         ? "'".uc(substr($dbh->{Username}, 0, $chg_col{$col}))."'"
2939             : 'SYSTIMESTAMP'
2940             );
2941             }
2942              
2943 0 0         my $parallel = $args{Parallel} ? '/* parallel(8) append */' : '';
2944              
2945             my $merge_sql = sprintf($sql,
2946             $parallel,
2947             $table,
2948             $args{MergeFilter} ? "$args{StgTable} WHERE $args{MergeFilter}": $args{StgTable},
2949             join(" AND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols),
2950             $upd_col_str,
2951             join(",", @fields),
2952             join(",", map {
2953 0 0 0       $col_map{$_} ? $stg_has{$col_map{$_}} ? "s.$col_map{$_}" : $col_map{$_}
2954 0 0         : $stg_has{$_} ? "s.$_" : "NULL"
    0          
    0          
2955             } @fields),
2956             );
2957              
2958             # No update if no update columns
2959 0 0         $merge_sql =~ s/^WHEN MATCHED.*\n//m unless @upd_cols;
2960 0           print("Executing: $merge_sql\n");
2961 0 0         return 1 if $args{NoExec};
2962              
2963 0 0         $dbh->do("ALTER SESSION ENABLE PARALLEL DML") if $args{Parallel};
2964              
2965 0           my $rows = $dbh->do($merge_sql) + 0;
2966 0           print("$rows rows updated/inserted\n\n");
2967 0           return $rows;
2968              
2969             }
2970             }
2971              
2972             # #!!!UNFINISHED!!!
2973             # Static block for mk_ext_table
2974             {
2975              
2976             my $sql = <
2977             CREATE TABLE %s (
2978             %s
2979             )
2980             ORGANIZATION EXTERNAL (
2981             TYPE oracle_loader
2982             DEFAULT DIRECTORY %s
2983             ACCESS PARAMETERS
2984             (
2985             RECORDS DELIMITED BY NEWLINE
2986             LOGFILE 'TEST.log'
2987             FIELDS TERMINATED BY '%s'
2988             )
2989             LOCATION ('%s')
2990             )
2991             SQL
2992              
2993             sub mk_ext_table {
2994 0     0     my $self = shift;
2995              
2996 0           my %args = @_;
2997              
2998 0 0         my $table = $args{Table} or confess "Need table prototype for external table";
2999              
3000 0   0       my $ext_table = $args{Name} || "ext_${table}$$";
3001 0 0         my $dir = $args{Dir} or confess "Need directory for external table $table";
3002 0 0         my $file = $args{File} or confess "Need file for external table $table";
3003              
3004 0           my $cols = $self->column_info($table);
3005 0           my $cmap = $cols->{MAP};
3006              
3007 0           my @col_list;
3008 0           for my $col (@{$cols->{LIST}}) {
  0            
3009 0           my $col_str = $col;
3010              
3011 0           my $cdata = $cmap->{$col};
3012 0           my $type = $cdata->{TYPE_NAME};
3013 0           my $dec = $cdata->{DECIMAL_DIGITS};
3014              
3015 0           $col_str .= " $type";
3016 0           my $size = $cdata->{COLUMN_SIZE};
3017              
3018 0           for ($type) {
3019 0 0         $col_str .=
    0          
    0          
3020             /CHAR/ ? "($size)"
3021             : /NUMBER/ ? (defined $dec) ? "($size,$dec)" : ''
3022             : '';
3023             }
3024              
3025             #$col_str .= " DEFAULT $cdata->{COLUMN_DEF}" if defined $cdata->{COLUMN_DEF};
3026             #$col_str =~ s/\s+$//;
3027             #$col_str .= " NOT NULL" unless $cdata->{NULLABLE};
3028              
3029 0           push @col_list, $col_str;
3030             }
3031              
3032             my $create_sql = sprintf($sql,
3033             $ext_table,
3034             join(",\n", @col_list ),
3035             $dir,
3036             #$args{RowDelimiter} || "\\n",
3037 0   0       $args{Delimiter} || "|",
3038             $file,
3039             );
3040              
3041 0           $self->{DBH}->do($create_sql);
3042              
3043 0           return $ext_table;
3044             }
3045             }
3046              
3047             package DBIx::BulkUtil::Release;
3048              
3049             sub new {
3050 0     0     my ($class, $f) = @_;
3051 0           bless $f, $class;
3052             }
3053              
3054 0     0     sub DESTROY { $_[0]->() }
3055              
3056             1;
3057              
3058             __END__