File Coverage

blib/lib/DBIx/BulkUtil.pm
Criterion Covered Total %
statement 45 1584 2.8
branch 0 910 0.0
condition 0 438 0.0
subroutine 15 119 12.6
pod 8 12 66.6
total 68 3063 2.2


line stmt bran cond sub pod time code
1             package DBIx::BulkUtil;
2              
3 1     1   23927 use DBI;
  1         19578  
  1         65  
4 1     1   8 use Carp qw(confess);
  1         1  
  1         59  
5              
6 1     1   4 use strict;
  1         6  
  1         20  
7 1     1   4 use warnings;
  1         1  
  1         1550  
8              
9             our $VERSION = '0.04';
10              
11             # Override this
12             sub passwd {
13 0     0 1   return '';
14             }
15              
16             # Override this
17             sub user {
18 0     0 0   return '';
19             }
20              
21             {
22             my @connect_options = qw(
23             Server
24             Database
25             Env
26             Type
27             User
28             Password
29             DataDir
30             ConnectMethod
31             RetryCount
32             RetryMinutes
33             BulkLogin
34             NoBlankNull
35             Silent
36             NoCharset
37             NoServer
38             Dsl
39             DslOptions
40             DateFormat
41             DatetimeFormat
42             DatetimeTzFormat
43             );
44             my %is_valid;
45             $is_valid{$_}++ for @connect_options;
46              
47             sub _options_valid {
48 0     0     my $class = shift;
49 0           my %opts = @_;
50 0           for my $opt (keys %opts) {
51 0 0         return $opt if !$is_valid{$opt};
52             }
53 0           return;
54             }
55             }
56              
57             # Override this to set server, db, env, type based on whatever
58             sub env2db {
59 0     0 0   my ($self,$args) = @_;
60              
61 0 0 0       $args->{Type} ||= (!$args->{Server} && $args->{Database} ) ? 'Oracle' : 'Sybase';
      0        
62 0 0         if ( $args->{Type} eq 'SybaseIQ' ) {
63 0           $args->{IsIQ}++;
64 0           $args->{Type} = 'Sybase'
65             }
66             }
67              
68             sub connect {
69 0     0 1   my $class = shift;
70              
71             # Use HandleError sub instead of more straightforward RaiseError
72             # attribute because Sybase 1.09 does not include line numbers in its
73             # RaiseError die messages. And a stacktrace is usually more helpful
74             # anyway.
75 0           my $dbi_opts = {
76             ChopBlanks => 1,
77             AutoCommit => 1,
78             PrintError => 0,
79             RaiseError => 1,
80             LongReadLen => 1_024 * 1_024,
81             };
82              
83 0 0 0       if ( @_ and ref($_[-1]) ) {
84 0           my $tmp_opts = pop @_;
85 0           @$dbi_opts{keys %$tmp_opts} = values %$tmp_opts;
86             }
87 0           my $bad_opt = $class->_options_valid(@_);
88 0 0         die "Invalid option $bad_opt to ${class}->connect" if $bad_opt;
89              
90             # TODO: Log or Output option?
91 0           my %args = @_;
92 0           my $fh;
93 0 0         open($fh, ">", "/dev/null") if $args{Silent};
94 0 0         my $stdout = $args{Silent} ? $fh : \*STDOUT;
95 0           local *STDOUT = $stdout;
96              
97 0   0       my $connect = $args{ConnectMethod} || 'connect';
98              
99 0           $class->env2db( \%args );
100              
101             my @dsl_args = $args{Dsl}
102 0           ? ref($args{Dsl}) ? @{$args{Dsl}} : $args{Dsl}
103 0 0         : ();
    0          
104              
105 0           my $type = $args{Type};
106 0           my $database = $args{Database};
107 0   0       my $server = $args{Server} || '';
108              
109 0 0         if (!@dsl_args) {
110 0 0         if ( $type eq 'Sybase' ) {
    0          
111             # Server-side charset is iso, need to specify it as client-side charset
112             # or else we get utf8 to iso charset conversion error when database handle is cloned
113             # (which happens automatically when you need multiple active statement handles).
114 0 0         push @dsl_args, "server=$server" unless $args{NoServer};
115 0 0         push @dsl_args, 'charset=iso_1' unless $args{NoCharset};
116 0 0         push @dsl_args, 'bulkLogin=1' if $args{BulkLogin};
117             } elsif ( $type eq 'Oracle' ) {
118 0 0         push @dsl_args, $database unless $args{NoServer};
119             } else {
120 0           die "Unable to connect to database type $type";
121             }
122             }
123              
124             # For Xtra Dsl options
125             push @dsl_args, ref($args{DslOptions})
126 0           ? @{$args{DslOptions}} : $args{DslOptions}
127 0 0         if $args{DslOptions};
    0          
128              
129 0           my $dsl = "dbi:$type:";
130 0           $dsl .= join( ";", @dsl_args );
131              
132 0   0       my $user = $args{User} || $class->user(\%args);
133 0   0       my $passwd = $args{Password} || $class->passwd(\%args);
134              
135 0           my $dbh;
136 0   0       my $retry = int($args{RetryCount} || 0);
137              
138 0   0       my $retry_seconds = 60 * ($args{RetryMinutes} || 10);
139 0 0         $retry_seconds = 60 * 10 if $retry_seconds < 0;
140              
141 0 0         my $conn_name = ($type eq 'Sybase') ? $server : $database;
142 0           while (1) {
143              
144 0           print "Connecting to $conn_name\n";
145 0           $dbh = eval { DBI->$connect($dsl, $user, $passwd, $dbi_opts) };
  0            
146 0           my $err = $@;
147 0 0         unless ($dbh) {
148 0 0         die $err unless $retry-- > 0;
149 0           print "Unable to connect to $conn_name. Will retry in $retry_seconds seconds";
150 0           sleep $retry_seconds;
151 0           redo;
152             }
153              
154             # Make selected Sybase database the current database
155             # And make date formats consistent
156 0 0         if ( $type eq 'Sybase' ) {
    0          
157             # Switch database after connect so that we get a helpful error message
158             # and an error instead of a warning
159 0 0         if ($database) {
160 0           print "Using $database database\n";
161 0           my $result = eval { $dbh->do("USE $database") };
  0            
162 0           my $err = $@;
163 0 0         unless ($result) {
164 0 0         die $err unless $retry-- > 0;
165 0           $dbh->disconnect();
166 0           print "Unable to use database $database on $server. Will retry in $retry_seconds seconds\n";
167 0           sleep $retry_seconds;
168 0           redo;
169             }
170             };
171 0   0       $dbh->{syb_date_fmt} = $args{DateFormat} || 'ISO';
172              
173 0 0 0       $dbh->do("set temporary option Load_ZeroLength_AsNULL = 'ON'") if $args{IsIQ} and !$args{NoBlankNull};
174             } elsif ( $type eq 'Oracle' ) {
175             # Fractions on Oracle "DATE" format not allowed
176 0   0       my $date_fmt = $args{DateFormat} || 'YYYY-MM-DD HH24:MI:SS';
177 0   0       my $datetime_fmt = $args{DatetimeFormat} || 'YYYY-MM-DD HH24:MI:SS.FF';
178 0   0       my $datetime_tz_fmt = $args{DatetimeTzFormat} || $args{DatetimeFormat} || $datetime_fmt;
179 0           $_ = $dbh->quote($_) for $date_fmt, $datetime_fmt, $datetime_tz_fmt;
180 0           $dbh->do("alter session set nls_date_format=$date_fmt");
181 0           $dbh->do("alter session set nls_timestamp_format=$datetime_fmt");
182 0           $dbh->do("alter session set nls_timestamp_tz_format=$datetime_tz_fmt");
183             }
184 0           last;
185             }
186              
187             # We do not want stack trace on connect so that we do not expose password
188             # But everywhere else it is useful
189 0           $dbh->{RaiseError} = 0;
190 0     0     $dbh->{HandleError} = sub { confess $_[0] };
  0            
191 0 0         return $dbh unless wantarray;
192 0           my $util = DBIx::BulkUtil::Obj->new($dbh, $passwd, \%args);
193 0           return $dbh, $util;
194             }
195              
196             # Just set the connect method and call connect()
197             sub connect_cached {
198 0     0 1   my $class = shift;
199 0           my @args = $class->override({ConnectMethod => 'connect_cached'}, @_);
200 0           $class->connect(@args);
201             }
202              
203             sub syb_connect {
204 0     0 1   my $class = shift;
205 0           my @args = $class->override({ Type => 'Sybase' }, @_);
206 0           return $class->connect(@args);
207             }
208              
209             sub syb_connect_cached {
210 0     0 0   my $class = shift;
211 0           my @args = $class->override({ Type => 'Sybase' }, @_);
212 0           return $class->connect_cached(@args);
213             }
214              
215             sub ora_connect {
216 0     0 1   my $class = shift;
217 0           my @args = $class->override({ Type => 'Oracle' }, @_);
218 0           return $class->connect(@args);
219             }
220              
221             sub ora_connect_cached {
222 0     0 1   my $class = shift;
223 0           my @args = $class->override({ Type => 'Oracle' }, @_);
224 0           return $class->connect_cached(@args);
225             }
226              
227             sub iq_connect {
228 0     0 1   my $class = shift;
229 0           my @args = $class->override({ Type => 'SybaseIQ' }, @_);
230 0           return $class->connect(@args);
231             }
232              
233             sub iq_connect_cached {
234 0     0 1   my $class = shift;
235 0           my @args = $class->override({ Type => 'SybaseIQ' }, @_);
236 0           return $class->connect_cached(@args);
237             }
238              
239              
240             # Overriden connect args need to be spliced in before any dbi options
241             sub override {
242 0     0 0   my $self = shift;
243 0           my ($ovr, @args) = @_;
244 0 0         if ( (@args % 2) == 0 ) {
245 0           return @args, %$ovr;
246             }
247 0           my $dbi_opts = pop @args;
248 0 0 0       die "Last argument to connect must be hash reference" unless $dbi_opts and ref($dbi_opts);
249 0           return @args, %$ovr, $dbi_opts;
250             }
251              
252             package DBIx::BulkUtil::Obj;
253              
254             our $BCP_DELIMITER = '|';
255              
256 1     1   1259 use Memoize qw(memoize);
  1         2447  
  1         59  
257 1     1   5 use Carp qw(confess);
  1         2  
  1         1002  
258              
259             sub new {
260 0     0     my ( $class, $dbh, $passwd, $args ) = @_;
261 0           my $type = $dbh->{Driver}{Name};
262 0 0         if ($type eq 'Sybase') {
263 0           (my $version = $dbh->{syb_server_version_string}) =~ s|/.*||;
264 0 0         $type = 'SybaseIQ' if $version =~ /IQ/;
265             }
266 0 0         $class =~ s/::Obj$// or die "Invalid class $class";
267              
268 0           $class .= "::" . $type;
269 0           return $class->util($dbh, $passwd, $args);
270             }
271              
272             sub util {
273 0     0     my $class = shift;
274 0           my ($dbh, $pw, $args) = @_;
275 0 0         confess "Must use subclass of this package" if __PACKAGE__ eq $class;
276 0           my %util_args;
277              
278 0 0 0       if ( $args and ref($args) ) {
279 0 0         $util_args{NoBlankNull} = 1 if $args->{NoBlankNull};
280             }
281              
282             # Prevent dbh from disconnecting after fork in child processes
283 0           my $dbh_pid = $$;
284 0 0   0     my $release = DBIx::BulkUtil::Release->new(sub { $dbh->{InactiveDestroy} = 1 if $dbh_pid != $$ });
  0            
285 0           bless { DBH => $dbh, PASSWORD => $pw, DELIMITER => $BCP_DELIMITER, RELEASE => $release, %util_args }, $class;
286             }
287              
288 0     0     sub dbh { $_[0]->{DBH} }
289              
290             sub get {
291 0     0     my $self = shift;
292 0           my $select = shift;
293 0           my $dbh = $self->{DBH};
294 0           my @result = $dbh->selectrow_array( $self->row_select($select) );
295 0 0         return $result[0] if @result == 1;
296 0           return @result;
297             }
298              
299             sub exec_sp {
300 0     0     my $self = shift;
301 0           my $dbh = $self->{DBH};
302 0           $dbh->do($self->sp_sql(@_));
303             }
304              
305             sub bcp_out {
306 0     0     my $self = shift;
307 0           my $opts = {};
308 0 0         if (ref $_[-1]) {
309 0           $opts = pop @_;
310             }
311 0           my ( $table, $file ) = @_;
312 0   0       $file ||= "$table.bcp";
313              
314 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
315 0   0       my $row_delim = $opts->{RowDelimiter} || $/;
316 0 0         my @esc = ( escape_char => $opts->{EscapeChar} ) if $opts->{EscapeChar};
317              
318             # Default to no quote char to be more compatible w/Sybase
319 0 0         my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef, escape_char => undef );
320              
321             # TODO: Give up on Text::CSV ??
322 0           my $csv;
323 0 0         if ( length($delimiter) == 1 ) {
324 0           require Text::CSV_XS;
325 0           $csv = Text::CSV_XS->new({
326             binary => 1,
327             eol => $row_delim,
328             sep_char => $delimiter,
329             @esc,
330             @quote_char,
331             });
332             }
333              
334 0 0         my $col_list = $opts->{Columns} ? $opts->{Columns} : "*";
335              
336             # Only for HP?
337             #local $ENV{NLS_LANG} = "AMERICAN_AMERICA.WE8ROMAN8";
338              
339 0   0       my $enc_opt = $opts->{Encoding} || '';
340 0           my $db_type = $self->type();
341 0 0         if ( $db_type eq 'Oracle' ) {
342              
343 0 0         my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';
344 0   0       my $nls_lang = $ENV{NLS_LANG} || '';
345 0 0         if ( $nls_lang =~ /utf8/i ) {
346 0   0       $enc_opt ||= 'utf8';
347             }
348 0 0         if ( $col_list eq '*' ) {
349 0           my @col_list;
350 0           my $col_info = $self->column_info($table);
351 0           my $list = $col_info->{LIST};
352 0           my $col_map = $col_info->{MAP};
353 0           my $xml_cnt;
354 0           for my $col (@$list) {
355 0 0         if ( $col_map->{$col}{TYPE_NAME} eq 'XMLTYPE' ) {
356 0           $xml_cnt++;
357 0           push @col_list, "XMLType.getclobval($col)";
358 0           next;
359             }
360 0           push @col_list, $col;
361             }
362 0 0         if ($xml_cnt) {
363 0           $col_list = join(",", @col_list);
364             }
365             }
366 0 0         $table = "$table PARTITION ($partition)" if $partition;
367             }
368 0 0         my $enc = $enc_opt ? ":encoding($enc_opt)" : '';
369              
370 0 0         open(my $fh, ">$enc", $file) or confess "Can not write to $file: $!";
371 0           my $sql = "SELECT $col_list FROM $table\n";
372 0 0         $sql .= $opts->{Filter} if $opts->{Filter};
373              
374 0           my $dbh = $self->{DBH};
375 0           my $sth = $dbh->prepare($sql);
376 0 0         $sth->{ChopBlanks} = 0 unless $opts->{TrimBlanks};
377 0           $sth->execute();
378 0 0         if ($opts->{Header}) {
379 0 0         if ($csv) {
380 0           $csv->print($fh, $sth->{NAME_lc});
381             } else {
382 0           print $fh join( $delimiter, @{$sth->{NAME_lc}} ), $row_delim;
  0            
383             }
384             }
385              
386 0           my $cnt = 0;
387 0           while ( my $row = $sth->fetchrow_arrayref() ) {
388 1     1   6 no warnings 'uninitialized';
  1         1  
  1         220  
389 0 0         if ($csv) {
390 0           $csv->print($fh, $row);
391             } else {
392 0           print $fh join($delimiter, @$row), $row_delim;
393             }
394 0           $cnt++;
395             }
396 0           close $fh;
397 0           return $cnt;
398             }
399              
400             {
401 1     1   4 no warnings 'once';
  1         2  
  1         4655  
402             *select2file = \&bcp_out;
403             }
404              
405             sub bcp_file {
406 0     0     my ($self, $file_in, $file_out) = @_;
407 0           my $opts = {};
408 0 0         if (ref $_[-1]) {
409 0           $opts = pop @_;
410             }
411              
412 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
413 0   0       my $esc = $opts->{EscapeChar} || "\\";
414              
415 0 0         my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef );
416 0           require Text::CSV_XS;
417 0           my $csv = Text::CSV_XS->new({
418             binary => 1,
419             eol => $/,
420             sep_char => $delimiter,
421             escape_char => $esc,
422             @quote_char,
423             });
424              
425 0 0         open(my $in_fh, "<", $file_in) or die "Err: $!";
426 0 0         open(my $out_fh, ">", $file_out) or die "Err: $!";
427 0           my $hdr = $csv->getline($in_fh);
428 0           $csv->column_names($hdr);
429 0 0         my @drop_cols = $opts->{DropCols} ? @{$opts->{DropCols}} : ();
  0            
430 0           my %drop; $drop{$_}++ for @drop_cols;
  0            
431              
432             my @cols =
433 0           $opts->{KeepCols} ? @{$opts->{KeepCols}}
434 0 0         : @drop_cols ? grep !$drop{$_}, @$hdr
    0          
435             : @$hdr;
436 0           my %hdr_idx; @hdr_idx{@$hdr} = 0..$#$hdr;
  0            
437              
438 0 0         $csv->print($out_fh, [@$hdr[@hdr_idx{@cols}]]) if $opts->{Header};
439 0           while ( my $row = $csv->getline_hr($in_fh) ) {
440 0           $csv->print($out_fh, [@$row{@cols}]);
441             }
442 0           close $in_fh;
443 0           close $out_fh;
444             }
445              
446             sub add_header {
447 0     0     my ($self, $table, $file, $opts) = @_;
448 0   0       $opts ||= {};
449              
450 0           my $cols;
451 0 0 0       if ( $opts->{Header} || $opts->{Columns} ) {
452             my $sel_str =
453             $opts->{Columns}
454             ? ref($opts->{Columns})
455 0           ? join(",", @{$opts->{Columns}})
456             : $opts->{Columns}
457 0 0         : '*';
    0          
458 0           my $sth = $self->{DBH}->prepare("SELECT $sel_str FROM $table WHERE 1=0");
459 0           $sth->execute();
460 0           $cols = $sth->{NAME_lc};
461 0           $sth->finish();
462             }
463              
464 0 0         return $self->add_quotes($table, $file, $cols, $opts) if $opts->{QuoteFields};
465              
466             # If quotes are not required, this is more efficient
467             # I doubt anyone uses either option anyway
468             # but highly doubt anyone uses the quoting
469 0           require File::Copy;
470 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
471 0   0       local $/ = $opts->{RowDelimiter} || "\n";
472              
473 0 0         open(my $fh, ">", "$file.bak") or die "Failed to open $file.bak: $!";
474              
475             # Unbuffer the filehandle for printing header
476             # because File::Copy uses unbuffered syswrite
477             # $fh->flush() after the print would also work depending on
478             # version of perl and whether IO::Handle is loaded
479 0           for ( select $fh ) { $| = 1; select $_ }
  0            
  0            
480 0           print $fh join($d, @$cols), $/;
481              
482 0 0         File::Copy::copy($file, $fh) or die "Failed to copy $file to $file.bak: $!";
483 0           close $fh;
484              
485 0           return "$file.bak";
486             }
487              
488             sub add_quotes {
489 0     0     my ($self, $table, $file, $cols, $opts) = @_;
490 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
491 0           my $dre = quotemeta($d);
492              
493 0           local ($_, $., $ARGV, *ARGV);
494 0           local ( $^I, @ARGV ) = ( '.bak', $file );
495 0   0       local $/ = $opts->{RowDelimiter} || "\n";
496 0           my $done;
497 0           while ( <> ) {
498 0 0 0       print join($d, @$cols), $/ if !$done++ && $opts->{Header};
499              
500 0 0         if ($opts->{QuoteFields}) {
501 0           chomp;
502 0           my @fields = split /$dre/;
503 0   0       /\s/ and $_ = qq("$_") for @fields;
504 0           $_ = join($d, @fields) . $/;
505             }
506 0           print;
507             }
508 0           return "$file.bak";
509             }
510              
511             sub type {
512 0     0     my $self = shift;
513 0           return $self->{DBH}{Driver}{Name};
514             }
515              
516             # Because of Sybase and its stupid mixed case column names,
517             # we need to be able to find the actual cased name for a given
518             # uncased column name.
519             # Just pray that there are not two columns with the same name
520             # in the same table that are differently cased.
521             memoize('column_info');
522              
523             sub column_info {
524             my $self = shift;
525             my $table = shift;
526              
527             my $schema;
528             my $dbtype = $self->type();
529             my ($tmp_db, $curr_db) = (undef,'');
530             my $dbh = $self->{DBH};
531             my %col_dflt;
532             if ( $dbtype eq 'Oracle' ) {
533             $table = uc($table);
534             if ( $table =~ /^(\w+)\.(\w+)$/ ) {
535             ($schema, $table) = ($1,$2);
536             } else { $schema = $self->curr_schema() }
537             } elsif ( $dbtype eq 'Sybase' ) {
538              
539             $tmp_db = $curr_db = $self->curr_db();
540              
541             if ( $table =~ /^#/ ) {
542             $table = $self->temp_table_name($table);
543             }
544              
545             if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(#?\w+)$/ ) {
546             ($tmp_db, $schema, $table) = ($1,$2,$3);
547             $schema ||= undef;
548              
549             # We can only get column info on the current database
550             $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db;
551             }
552              
553             $schema ||= '%';
554              
555             # Sybase gets metadata through a (under the DBD hood) stored proc, but does not return defaults.
556             # So get defaults here.
557             my $sth = $dbh->prepare( sprintf( $self->default_sql(), $table ) );
558             $sth->execute();
559             $sth->bind_columns( \my ( $col_name, $default ) );
560             while ( $sth->fetch ) {
561             $col_dflt{$col_name} .= $default;
562             }
563             }
564              
565             my $sth = $self->{DBH}->column_info($tmp_db, $schema, $table, '%');
566             my @names = @{$sth->{NAME_uc}};
567             my %row; $sth->bind_columns(\@row{@names});
568             my @list;
569             my %col_map;
570             my $col_cnt = 0;
571             while ( $sth->fetch() ) {
572              
573             # Data is probably in order, but we are not guaranteed
574             # So assign by index instead of pushing to array if possible
575             # IQ does not have ORDINAL_POSITION so fall back to select order
576             my $idx = defined($row{ORDINAL_POSITION}) ? $row{ORDINAL_POSITION}-1 : $col_cnt;
577             $col_cnt++;
578              
579             my $name = lc($row{COLUMN_NAME});
580             $list[$idx] = $name;
581             ($row{COLUMN_DEF} = $col_dflt{$name}) =~ s/^default\s*//i if defined($col_dflt{$name}) and !defined($row{COLUMN_DEF});
582             $col_map{$name} = { %row };
583             }
584             $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
585              
586             return unless $col_cnt;
587             my %col_info = (
588             LIST => \@list,
589             MAP => \%col_map,
590             );
591             return \%col_info;
592             }
593              
594             sub last_chg_list {
595 0     0     my $self = shift;
596 0           my ($table, $columns) = @_;
597              
598             # Determine if last_chg_user, last_chg_date need to be updated
599 0           my %chg_field = (last_chg_user => 1, last_chg_date => 1);
600 0           delete $chg_field{$_} for map lc, @$columns;
601 0           my %chg_cols;
602 0 0         if (%chg_field) {
603             # Are chg columns in table
604 0           my $col_info = $self->column_info($table);
605 0           my $col_map = $col_info->{MAP};
606 0           for my $c (keys %chg_field) {
607 0 0         $chg_cols{$c} = $col_map->{$c}{COLUMN_SIZE} if $col_map->{$c};
608             }
609             }
610              
611 0           return %chg_cols;
612             }
613              
614             sub key_columns {
615 0     0     my ($self, $table) = @_;
616              
617 0           my $pk = $self->primary_key($table);
618 0 0         return $pk if $pk;
619              
620 0           my $idx = $self->index_info($table);
621 0 0         return unless $idx;
622              
623             # Look for unique indexes with suffixes uk, pk, or key
624 0           my ($pk_name) = sort grep /(?i)(?:[pu]k|key)\d*$/, keys %$idx;
625 0 0         return $idx->{$pk_name} if $pk_name;
626              
627 0           my ($idx_name) = sort keys %$idx;
628 0           return $idx->{$idx_name};
629             }
630              
631             sub upd_columns {
632 0     0     my ($self, $table, $key_cols) = @_;
633              
634 0           my $col_data = $self->column_info($table)->{LIST};
635 0   0       $key_cols ||= $self->key_columns($table);
636 0 0         return unless $key_cols;
637              
638 0           my %is_key_col; $is_key_col{$_}++ for @$key_cols;
  0            
639 0           return [ grep !$is_key_col{$_}, @$col_data ];
640             }
641              
642             sub delete {
643 0     0     my ($self, $table, $where) = @_;
644              
645 0           my $dbh = $self->{DBH};
646 0           my $sql = "DELETE FROM $table";
647 0 0         $sql .= " WHERE $where" if $where;
648              
649 0           my $rows = $dbh->do($sql) + 0;
650              
651 0           print "$rows rows deleted\n";
652 0           return $rows;
653             }
654              
655             # Execute sql with retry on deadlocks
656             sub execute {
657 0     0     my ($self,$sth,@args) = @_;
658              
659             # We can pass a sql statement or a sth
660 0 0         $sth = $self->{DBH}->prepare($sth) if !ref($sth);
661              
662 0           my $retry = 5;
663 0           for (1..$retry) {
664 0           my $status = eval { $sth->execute(@args) };
  0            
665 0 0         return $status if $status;
666 0 0         confess $@ unless $@ =~ /deadlock/i;
667 0           print "Deadlock detected on retry $_ of 5\n";
668 0 0         sleep 2 if $_ < $retry;
669             }
670 0           confess $@;
671             }
672              
673             sub ora_date_fmt {
674              
675             # Not very OO-ish but allow calling the Oracle date mask routine
676             # From any generic utility object
677 0     0     my $self = shift;
678 0           DBIx::BulkUtil::Oracle->date_mask(@_);
679              
680             }
681              
682             sub strptime_fmt {
683 0     0     my ($class, $str, $fmt) = @_;
684 0   0       $fmt ||= DBIx::BulkUtil::Oracle->date_mask($str);
685 0 0         return undef unless $fmt;
686 0           for ($fmt) {
687 0           s/MONTH/%B/;
688 0           s/MON/%b/;
689 0           s/MM/%m/;
690 0           s/DD/%d/;
691 0           s/YYYY/%Y/;
692 0           s/YY/%y/;
693 0           s/RRRR/%Y/;
694 0           s/RR/%y/;
695 0           s/HH24/%H/;
696 0           s/HH(?:12)?/%I/;
697 0           s/MI/%M/;
698 0           s/SS/%S/;
699 0           s/AM/%p/;
700 0           s/DY/%a/;
701 0           s/DAY/%a/;
702 0           s/TZD/%Z/;
703 0           s/TZH.TZD/%z/;
704 0           s/"(.)"/$1/g;
705             }
706 0           return $fmt;
707             }
708              
709             sub blk_prepare {
710 0     0     my ($self, $table, %args) = @_;
711 0   0       my $blk_opts = $args{BlkOpts} || {};
712 0   0       my $commit = $args{CommitSize} || 1000;
713 0           my $con = $args{Constants};
714              
715 0 0         my $col_info = $self->column_info($table) or confess "Table $table does not exist";
716 0           my @col_list = @{$col_info->{LIST}};
  0            
717 0           my $arg_len = @col_list;
718              
719 0           my $col_cnt = @col_list;
720 0           my $sql = "INSERT INTO $table VALUES (" . join(",", ("?") x $col_cnt) . ")";
721 0           my $type = $self->type();
722 0 0         my @blk_opts = ($type eq 'Sybase')
723             ? { syb_bcp_attribs => $blk_opts }
724             : ();
725              
726 0           my $dbh = $self->{DBH};
727 0           my $sth = $dbh->prepare($sql, @blk_opts);
728              
729 0           my ($exec_f,$commit_f,$finish_f);
730 0           my @ex_arg_list = (undef) x @col_list;
731 0           my $cnt = 0;
732 0 0         if ($con) {
733 0           my %const = %$con;
734 0           my @c_list = keys %const;
735              
736 0           my %col_pos;
737 0           @col_pos{@col_list} = 0..$#col_list;
738 0           my %const_pos;
739 0           @const_pos{@c_list} = delete @col_pos{@c_list};
740 0           $arg_len = keys %col_pos;
741              
742             # Create arg array for execute method
743             # Set constants and create sub for all but constant args
744 0           @ex_arg_list[@const_pos{@c_list}] = @const{@c_list};
745 0           my @non_const = sort { $a <=> $b } values %col_pos;
  0            
746 0 0         $sth->{HandleError} = undef if $type eq 'Oracle';
747              
748 0 0         if ($type eq 'Sybase') {
749 0     0     $exec_f = sub { @ex_arg_list[@non_const] = @_; $sth->execute(@ex_arg_list) };
  0            
  0            
750 0     0     $commit_f = sub { $dbh->commit() };
  0            
751 0     0     $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef };
  0            
  0            
  0            
752             } else {
753 0     0     $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$i++] for @non_const };
  0            
  0            
  0            
754             $commit_f = sub {
755 0     0     my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list);
756 0 0         unless (defined $t) {
757 0           for my $i (0..$#status) {
758 0 0         next unless ref $status[$i];
759 0 0         my @row = map { ref($ex_arg_list[$_]) ? qq('$ex_arg_list[$_][$i]') : $ex_arg_list[$_] } 0..$#ex_arg_list;
  0            
760 0           confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]";
761             }
762             }
763 0           $_ = [] for @ex_arg_list[@non_const];
764 0           $r;
765 0           };
766 0 0   0     $finish_f = sub { $commit_f->() if $cnt > 0 };
  0            
767             }
768             } else {
769 0 0         if ($type eq 'Sybase') {
770 0     0     $exec_f = sub { $sth->execute(@_); '0E0' };
  0            
  0            
771 0     0     $commit_f = sub { $dbh->commit(); $cnt };
  0            
  0            
772 0 0   0     $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef; ( $cnt > 0 ) ? $cnt : '0E0' };
  0            
  0            
  0            
  0            
773             } else {
774 0     0     $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$_] for 0..$#ex_arg_list; return '0E0' };
  0            
  0            
  0            
  0            
775             $commit_f = sub {
776 0     0     my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list);
777 0 0         unless (defined $t) {
778 0           for my $i (0..$#status) {
779 0 0         next unless ref $status[$i];
780 0           my @row = map { qq('$ex_arg_list[$_][$i]') } 0..$#ex_arg_list;
  0            
781 0           confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]";
782             }
783             }
784 0           $_ = [] for @ex_arg_list;
785 0           $r;
786 0           };
787 0 0   0     $finish_f = sub { ( $cnt > 0 ) ? $commit_f->() : '0E0' };
  0            
788             }
789             }
790              
791 0           bless {
792             CNT => \$cnt,
793             COMMIT_SIZE => $commit,
794             EXEC_FUNC => $exec_f,
795             COMMIT_FUNC => $commit_f,
796             FINISH_FUNC => $finish_f,
797             ARG_LEN => $arg_len,
798             }, "DBIx::BulkUtil::BLK";
799             }
800              
801             sub prepare {
802 0     0     my $self = shift;
803 0           my %opt = @_;
804 0           my $table = $opt{Table};
805 0           my $sql = $opt{Sql};
806 0           my $columns = $opt{Columns};
807 0           my $href = $opt{BindHash};
808 0           my $aref = $opt{BindArray};
809             my $by_name =
810             defined($opt{ByName}) ? $opt{ByName}
811 0 0 0       : ( !$href && !$aref ) ? 0
    0          
    0          
812             : ($self->type() eq 'Sybase' ) ? 0
813             : 1;
814 0 0 0       confess "Can not supply both BindHash and BindArray" if $href && $aref;
815 0 0 0       confess "Can not use BindHash or BindArray without ByName" if ( $href || $aref ) && !$by_name;
      0        
816              
817 0 0 0       confess "Must supply Table or Sql to prepare" unless $table || $sql;
818 0 0 0       confess "Can not supply both Table and Sql to prepare" if $table && $sql;
819              
820 0           my $dflt_col = eval {
821 0 0 0       $columns ||= $self->column_info($table)->{LIST} if $table;
822 0           1;
823             };
824 0 0         confess "Table $table not found in datbase" unless $dflt_col;
825              
826 0 0 0       if ( $columns && @$columns ) {
827              
828             # A little overkill to get a nicely formatted SQL statement
829 0 0         my $c_sep = ( @$columns > 5 ) ? "\n" : '';
830 0           my $cnt;
831 0 0   0     my $c_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $cnt++ ? ' ' : '' };
  0 0          
  0            
832              
833 0           my $h_cnt;
834 0 0   0     my $h_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $h_cnt++ ? ' ' : '' };
  0 0          
  0            
835              
836 0 0         my $v_sep = $by_name ? ( @$columns > 5 ) ? "\n" : '' : '';
    0          
837 0 0   0     my $hold = $by_name ? sub { $h_ind->() . ":$_" } : sub { "?" };
  0            
  0            
838              
839 0   0       $sql ||= sprintf("INSERT INTO $table ($c_sep%s$c_sep) VALUES ($v_sep%s$v_sep)",
840             join(",$c_sep", map $c_ind->() . $_, @$columns),
841             join(",$v_sep", map $hold->(), @$columns),
842             );
843             }
844 0           print "Preparing: $sql\n";
845 0           my $sth = $self->dbh->prepare($sql);
846              
847 0 0         if ($href) {
    0          
848 0           $sth->bind_param_inout( ":$_" => \$href->{$_}, 0 ) for @$columns;
849             } elsif ($aref) {
850 0           $sth->bind_param_inout( ":$columns->[$_]" => \$aref->[$_], 0 ) for 0..$#$columns;
851             }
852              
853 0           return $sth;
854              
855             }
856              
857             sub prepare_upd {
858              
859 0     0     my $self = shift;
860              
861 0           my %args = @_;
862              
863 0   0       my $table = $args{Table} || die "Must supply Table option";
864              
865 0           my $col_info = $self->column_info($table);
866 0   0       my $col_list = $args{Columns} || $col_info->{LIST};
867              
868 0   0       my $key_cols = $args{KeyCols} || $self->key_columns($table);
869 0   0       my $upd_cols = $args{UpdCols} || $self->upd_columns($table);
870              
871 0           my $sql = <
872             UPDATE $table
873             SET
874 0           @{[ join( ",\n ", map "$_ = ?", @$upd_cols )]}
875             WHERE
876 0           @{[ join( " AND\n ", map "$_ = ?", @$key_cols )]}
877             SQL
878 0           print "Preparing: $sql\n";
879 0           my $sth = $self->{DBH}->prepare($sql);
880              
881 0           my %col_pos;
882 0           my $cnt = 0;
883 0           $col_pos{$_} = $cnt++ for @$col_list;
884              
885 0           my @sth_pos;
886 0           push @sth_pos, $col_pos{$_} for @$upd_cols, @$key_cols;
887              
888             return sub {
889 0 0   0     unless (@_) {
890 0           $sth->finish();
891 0           undef $sth;
892 0           undef @sth_pos;
893 0           return;
894             }
895 0           $sth->execute(@_[@sth_pos]);
896             }
897 0           }
898              
899 0     0     sub is_iq { 0 }
900              
901             package DBIx::BulkUtil::BLK;
902              
903 1     1   7 use Carp qw(confess);
  1         2  
  1         234  
904              
905             sub execute {
906 0     0     my $self = shift;
907 0 0         unless (@_ == $self->{ARG_LEN}) {
908 0           my $arg_cnt = @_;
909 0           confess "Execute argument count $arg_cnt must be $self->{ARG_LEN}";
910             }
911 0           my $f = $self->{ARG_FUNC};
912 0           my $rows = $self->{EXEC_FUNC}->(@_);
913 0           my $cnt = $self->{CNT};
914 0 0         if ( ++$$cnt >= $self->{COMMIT_SIZE} ) {
915 0           $rows = $self->{COMMIT_FUNC}->();
916 0           $$cnt = 0;
917             }
918 0           return $rows;
919             }
920              
921             sub finish {
922 0     0     my $self = shift;
923 0           $self->{FINISH_FUNC}->();
924             }
925              
926             package DBIx::BulkUtil::Sybase;
927              
928 1     1   6 use Carp qw(confess);
  1         1  
  1         2168  
929              
930             our @ISA = qw(DBIx::BulkUtil::Obj);
931              
932 0     0     sub now { 'getdate()' };
933              
934             sub add {
935 0     0     my $self = shift;
936 0           my $date = shift;
937 0           my $n = shift;
938 0           my $unit = shift;
939 0           my $new_date = "dateadd( $unit, $n, $date)";
940 0 0         return $new_date unless @_;
941 0           return $self->add( $new_date, @_ );
942             }
943              
944             sub diff {
945 0     0     my $self = shift;
946 0           my $date1 = shift;
947 0           my $date2 = shift;
948 0           my $unit = shift;
949 0           my $new_date = "datediff( $unit, $date1, $date2)";
950 0           return $new_date;
951             }
952              
953             sub row_select {
954 0     0     my $self = shift;
955 0           my $sel = shift;
956 0           return "select $sel";
957             }
958              
959             sub sp_sth {
960 0     0     my $self = shift;
961 0           my $sth = $self->{DBH}->prepare($self->sp_sql(@_));
962 0           $sth->execute();
963 0           return $sth;
964             }
965              
966             sub sp_sql {
967 0     0     my $self = shift;
968 0           my ($stored_proc, @args) = @_;
969 0           return "exec " . join(" ", $stored_proc, join(",", map {$self->{DBH}->quote($_)} grep !/^:cursor$/, @args));
  0            
970             }
971              
972             # This is trivial in Sybase, but a necessary function for Oracle
973             # and so makes this portably compatible
974             sub to_datetime {
975 0     0     my $self = shift;
976 0           my $date = shift;
977              
978 0           return "'$date'";
979             }
980              
981             sub bcp_in {
982 0     0     my $self = shift;
983 0 0         my $optref = (ref $_[-1]) ? pop @_ : {};
984 0           my %opts = %$optref;
985              
986 0           my ( $table, $file, $dir ) = @_;
987 0 0         my $partition = ( $table =~ s/(:\d+)$// ) ? $1 : '';
988              
989 0   0       $file ||= "$table.bcp";
990 0   0       $dir ||= 'in';
991              
992 0           my $dbh = $self->{DBH};
993 0           my $db = $dbh->{Name};
994 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
995 0           my $server = $1;
996 0           my $database = $self->curr_db();
997              
998 0           my $user = $dbh->{Username};
999 0   0       my $delimiter = $opts{Delimiter} || $self->{DELIMITER};
1000 0   0       my $row_delimiter = $opts{RowDelimiter} || "\n";
1001 0   0       my $commit_size = $opts{CommitSize} || 1000;
1002              
1003 0 0 0       my $bcp_table =
    0          
    0          
1004             (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1005             : ($table =~ /^\w+$/) ? "$database..$table"
1006             : ($table =~ /^\w*\.\w+$/) ? "$database.$table"
1007             : confess "Can not determine database for bcp";
1008              
1009 0           $bcp_table .= $partition;
1010              
1011             # Simulate Oracle sqlldr Append/Replace/Truncate
1012 0           my $id_cnt;
1013 0 0         if ( $dir eq 'in' ) {
1014 0   0       my $mode = $opts{Action} || "A";
1015 0 0         if ( $mode eq 'T' ) {
    0          
1016 0           my $sql = "TRUNCATE TABLE $bcp_table";
1017 0           print "Executing: $sql\n";
1018 0           $dbh->do($sql);
1019             } elsif ($mode eq 'R') {
1020 0           $self->delete($bcp_table, '', $commit_size);
1021             }
1022 0 0         confess "BCP file $file does not exist" unless -f $file;
1023              
1024             # Save some work
1025             # checking underscore ok, we just did -f above
1026 0 0         unless ( -s _ ) {
1027 0           print "$file is empty. Skipping bcp\n";
1028              
1029             # Make any log file parsers happy
1030 0           print "0 rows copied\n";
1031 0           return 0;
1032             }
1033              
1034             # All this to decide whether or not to use '-E'
1035             # Only use '-E' if there is an identity column
1036             # And GenerateId is false
1037 0 0         unless ( $opts{GenerateId} ) {
1038 0           my $col_info = $self->column_info($table);
1039 0           my $col_map = $col_info->{MAP};
1040 0 0         if ($col_map) {
1041 0           for my $c ( values %$col_map ) {
1042 0 0 0       ++$id_cnt and last if $c->{TYPE_NAME} =~ /identity/;
1043             }
1044             }
1045             }
1046             }
1047              
1048 0 0         my ($action,$to_from) = ($dir eq 'in') ? ('Loading', 'from') : ('Exporting', 'to');
1049 0           print "$action $server/$bcp_table $to_from $file\n";
1050              
1051 0           my (@max_err_opt, @commit_opt, @header_opt, @id_opt);
1052 0   0       my $max_err_cnt = $opts{MaxErrors} || 0;
1053 0 0         if ( $dir eq 'in' ) {
1054 0           @max_err_opt = (-m => $max_err_cnt);
1055 0           @commit_opt = (-b => $commit_size);
1056 0 0         @header_opt = (-F => $opts{Header}+1) if $opts{Header};
1057 0 0         @id_opt = "-E" if $id_cnt;
1058             }
1059              
1060 0   0       my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
1061 0   0       my $in_temp_dir = $opts{TempDir} || $opts{Debug};
1062 0           my $temp_dir;
1063 0 0 0       $temp_dir = $opts{TempDir} || "." if $in_temp_dir;
1064              
1065 0           require File::Temp;
1066 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
1067 0 0         my @unlink = $keep_temp ? (UNLINK => 0) : ();
1068 0           my $error_file = File::Temp->new(
1069             TEMPLATE => "${table}_XXXXX",
1070             SUFFIX => ".err",
1071             @temp_dir, @unlink,
1072             );
1073 0           chmod(0664, $error_file->filename());
1074 0           $error_file->close();
1075              
1076 0 0         my @packet_size = $opts{PacketSize} ? ( -A => $opts{PacketSize} ) : ();
1077 0 0         my @passthru = $opts{PassThru} ? @{$opts{PassThru}} : ();
  0            
1078              
1079 0           my ( $fmt_file, $tmp_fmt_file );
1080 0 0 0       if ( $opts{FormatFile} ) {
    0 0        
      0        
1081 0           $fmt_file = $opts{FormatFile};
1082 0           } elsif ( ( $opts{ColumnList} && $opts{ColumnList} ) || ( $opts{Filler} && @{$opts{Filler}} ) ) {
1083             ($tmp_fmt_file,$fmt_file) = $self->mk_fmt_file(
1084             Table => $table,
1085             Delimiter => $delimiter,
1086             RowDelimiter => $row_delimiter,
1087             ColumnList => $opts{ColumnList},
1088             Filler => $opts{Filler},
1089             TempDir => $opts{TempDir},
1090             FormatFileName => $opts{FormatFileName},
1091 0           KeepTempFiles => $keep_temp,
1092             );
1093             }
1094 0 0         my @fmt_file_opt = $fmt_file ? ( -f => $fmt_file ) : '-c';
1095              
1096             # UTF-8 doesn't work on HP - default is roman8 on HP
1097             # Should probably make '-J' some kind of option, with maybe
1098             # a map of OS types and default values. But leave that for
1099             # a later date.
1100 0           my @cmd = ( bcp => $bcp_table, $dir, $file,
1101             -U => $user,
1102             #-J => "utf8",
1103             -S => $server,
1104             -t => $delimiter,
1105             -r => $row_delimiter,
1106             -e => $error_file->filename(),
1107             @header_opt,
1108             @id_opt,
1109             @commit_opt,
1110             @max_err_opt,
1111             @packet_size,
1112             @passthru,
1113             @fmt_file_opt,
1114             );
1115 0           print "Executing: @cmd\n";
1116 0           push @cmd, -P => $self->{PASSWORD};
1117 0 0         open(my $fh, "-|", @cmd) or confess "Can't exec bcp: $!";
1118              
1119 0           my ($rows, $failed, $partially_failed);
1120 0           local ($_, $.);
1121              
1122 0           my $err_cnt = my $c_lib_err_cnt = my $srvr_err_cnt = 0;
1123 0           while (<$fh>) {
1124 0           print;
1125 0 0         if ( /^(Server|C[TS]LIB) Message/ ) {
1126 0           my $msg_type = $1;
1127 0 0         if ( $msg_type eq 'CSLIB' ) {
    0          
1128 0 0         if ( m|/N(\d+)| ) {
1129             # Sybase says truncation is not an error, so we will too
1130             # Or else we might get > 1 error on the same row
1131 0 0         unless ( $1 == 36 ) {
1132 0           $err_cnt++;
1133 0           $c_lib_err_cnt++;
1134             }
1135             }
1136             } elsif ( $msg_type eq 'CTLIB' ) {
1137 0           $err_cnt++;
1138 0           $c_lib_err_cnt++;
1139             } else {
1140             # On server errors the whole batch is an error
1141 0 0         if ( /\s(\d+)/ ) {
1142             # Ignore 'slow bcp' warning
1143 0 0         unless ( $1 == 4852 ) {
1144 0           $err_cnt += $commit_size;
1145 0           $srvr_err_cnt += $commit_size;
1146             }
1147             } else {
1148 0           $err_cnt += $commit_size;
1149 0           $srvr_err_cnt += $commit_size;
1150             }
1151             }
1152             }
1153 0 0         $rows = $1 if /^(\d+) rows copied/;
1154              
1155             # failed or partially failed
1156 0 0         if ( /^bcp copy in ((?:partially )?)failed/ ) {
1157 0 0         $partially_failed++ if $1;
1158 0           $failed++;
1159             }
1160             }
1161              
1162             # "NaN" (literally "NaN") to numeric errors
1163             # do not show up on STDOUT.
1164             # So we may as well search the err file to count
1165             # all CSLIB and CTLIB errors.
1166             # Truncation errors do not show up in file, so we
1167             # don't have to filter them out as we would if we
1168             # were parsing STDOUT.
1169 0           my $err_file_cnt = 0;
1170 0 0         open(my $err_h, "<", $error_file->filename()) or die "Failed to open $error_file: $!";
1171 0           while (<$err_h>) {
1172 0 0         $err_file_cnt++ if /^#@ Row \d+: Not transferred/;
1173             }
1174 0           close $err_h;
1175              
1176 0 0         if ( $err_file_cnt > $c_lib_err_cnt ) {
1177 0           $err_cnt += $err_file_cnt - $c_lib_err_cnt;
1178             }
1179              
1180             # BCP 11.x,12.x returns meaningful exit status
1181             # 10.x does not (returns 0 even on errors)
1182 0           my $close_success = close $fh;
1183              
1184 0 0         unless ($close_success) {
1185 0           my $exit_stat = $? >> 8;
1186 0           my $exit_sig = $? & 127;
1187 0           my $exit_core = $? & 128;
1188              
1189             # bcp will exit with non-zero status on any 'Server' error,
1190             # but not on 'CSLIB' errors unless 'CSLIB' error count exceeds max.
1191 0 0         if ( $exit_stat != 0 ) {
1192 0 0         if ( $dir eq 'in' ) {
1193              
1194             # Some of this may seem unneccessary, but Sybase bcp is
1195             # horribly inconsistent.
1196              
1197             # Exceeded the error count
1198 0 0         confess "BCP error - max error count ($max_err_cnt) exceeded - bcp returned status $exit_stat: $!"
1199             if $err_cnt > $max_err_cnt;
1200              
1201             # The load was aborted before bcp indicated that it finished
1202 0 0 0       confess "BCP error - bcp aborted [$exit_stat]: $!"
1203             if !defined($rows) and !$failed;
1204              
1205             # BCP failed - even if we allow some errors on a small file, if zero rows are copied
1206             # then call it a total failure.
1207 0 0 0       confess "BCP error - bcp failed [$exit_stat]: $!"
1208             if $failed and !$partially_failed;
1209              
1210             } else {
1211 0           confess "BCP error - bcp returned status $exit_stat: $!";
1212             }
1213             }
1214              
1215 0 0         confess "BCP error - bcp recieved signal $exit_sig" if $exit_sig > 0;
1216 0 0         confess "BCP error - bcp coredumped" if $exit_core;
1217             }
1218              
1219             # Will miss error count exceeded error on 10.x
1220             # But will catch other errors if load is aborted
1221             # Or no rows are loaded.
1222 0 0         confess "BCP error - no rows copied" if !defined($rows);
1223              
1224             # CTLIB errors do not cause non-zero exit - so catch them here
1225 0 0         confess "BCP error - max error count ($max_err_cnt) exceeded" if $err_cnt > $max_err_cnt;
1226 0   0       $rows ||= 0;
1227 0           return $rows;
1228             }
1229              
1230             {
1231 1     1   5 no warnings 'once';
  1         2  
  1         6248  
1232             *bcp = \&bcp_in;
1233             }
1234              
1235             sub mk_fmt_file {
1236 0     0     my $self = shift;
1237 0           my %opts = @_;
1238              
1239 0   0       my $table = $opts{Table} || die "Table required for mk_fmt_file";
1240 0           my $col_info = $self->column_info($table);
1241 0           my $db_col_list = $col_info->{LIST};
1242 0           my %is_db_column;
1243 0           $is_db_column{$_}++ for @$db_col_list;
1244 0           my %is_filler;
1245 0 0         if ( $opts{Filler} ) {
1246 0           $is_filler{lc($_)}++ for @{$opts{Filler}};
  0            
1247             }
1248              
1249 0           my ($tmp_fmt_file,$fmt_file);
1250 0 0         if ( $opts{FormatFileName} ) {
1251 0           $fmt_file = $opts{FormatFileName};
1252             } else {
1253 0           require File::Temp;
1254 0   0       my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
1255 0   0       my $in_temp_dir = $opts{TempDir} || $opts{Debug};
1256 0           my $temp_dir;
1257              
1258 0 0 0       $temp_dir = $opts{TempDir} || "." if $in_temp_dir;
1259 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
1260 0 0 0       my @unlink = ( $keep_temp || !defined(wantarray) ) ? (UNLINK => 0) : ();
1261 0           $tmp_fmt_file = File::Temp->new(
1262             TEMPLATE => "${table}_XXXXX",
1263             SUFFIX => ".fmt",
1264             @temp_dir, @unlink,
1265             );
1266 0           $fmt_file = $tmp_fmt_file->filename();
1267 0           chmod(0664, $tmp_fmt_file);
1268 0           $tmp_fmt_file->close();
1269             }
1270              
1271 0   0       my $delim = $opts{Delimiter} || "|";
1272 0   0       my $row_delim = $opts{RowDelimiter} || "\n";
1273              
1274             # Need escaped text in fmt file
1275             # for CR/LF
1276 0           for ($delim,$row_delim) {
1277 0           s/\n/\\n/g;
1278 0           s/\r/\\r/g;
1279             }
1280              
1281             my @col_list = ( $opts{ColumnList} && @{$opts{ColumnList}} )
1282 0           ? @{$opts{ColumnList}}
1283 0 0 0       : @{$col_info->{LIST}};
  0            
1284              
1285 0           my $ncols = @col_list;
1286 0 0         open( my $fh, ">", $fmt_file ) or confess "Failed to open $fmt_file: $!";
1287 0           print $fh "10.0\n";
1288 0           print $fh "$ncols\n";
1289              
1290 0           my $col_map = $col_info->{MAP};
1291 0           for my $i (1..$ncols) {
1292 0           my $name = $col_list[$i-1];
1293 0 0         my $d = ( $i == $ncols ) ? $row_delim : $delim;
1294 0           my @row = ($i, 'SYBCHAR', 0);
1295 0 0         if ($is_filler{lc($name)}) {
    0          
1296 0           push @row, 0, qq["$d"], 0;
1297             } elsif ($is_db_column{lc($name)}) {
1298 0           my $info = $col_map->{lc($name)};
1299              
1300             # Native Sybase date format size is 26 though metadata says 23
1301             # For numbers, add extra for decimal
1302             my $size =
1303             ( $info->{TYPE_NAME} =~ /date/ ) ? 26
1304             : ( $info->{TYPE_NAME} =~ /char|text/ ) ? $info->{COLUMN_SIZE}
1305 0 0         : $info->{COLUMN_SIZE} + 1;
    0          
1306 0           push @row, $size, qq["$d"], $info->{ORDINAL_POSITION}, $name;
1307 0           } else { confess "$name is neither a db nor filler column" }
1308 0           print $fh join("\t", @row), "\n";
1309             }
1310              
1311 0           close $fh;
1312              
1313              
1314             # Also return temp object so it will not be cleaned up yet
1315             return
1316 0 0         wantarray ? ($tmp_fmt_file, $fmt_file)
    0          
1317             : $tmp_fmt_file ? $tmp_fmt_file
1318             : $fmt_file;
1319              
1320             }
1321              
1322             sub bcp_out {
1323 0     0     my $self = shift;
1324 0           my @opts;
1325 0 0         if (ref $_[-1]) {
1326 0           @opts = pop @_;
1327             }
1328 0           my ($table, $file) = @_;
1329 0   0       $file ||= "$table.bcp";
1330              
1331 0 0 0       my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb';
1332              
1333             # Sybase rounds money columns, need to bcp a view of it
1334             # if any exist.
1335 0           my $dbh = $self->{DBH};
1336              
1337             # Need to save current db in case view is created
1338 0           my $curr_db = $self->curr_db();
1339 0           my $view = $self->mk_view($table, @opts);
1340              
1341 0   0       my $rows = eval { $self->bcp($view || $table, $file, 'out', @opts) };
  0            
1342 0 0         unless (defined $rows) {
1343 0           my $err = $@;
1344 0 0         if ($view) {
1345 0           warn "BCP error detected - dropping view $view\n";
1346 0           my $result = eval { $dbh->do("DROP VIEW $view") };
  0            
1347 0 0         warn "Unable to drop view $view: $@" unless $result;
1348 0 0 0       $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db;
1349             }
1350 0           confess $err;
1351             }
1352              
1353 0 0         if ($view) {
1354 0           print "Dropping view $view\n";
1355 0           $dbh->do("DROP VIEW $view");
1356 0 0 0       $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db;
1357             }
1358 0 0 0       if ( !@opts or !$opts[0]{NoFix} ) {
1359 0           my $bak = eval { $self->fix_bcp_file($file, @opts) };
  0            
1360 0 0         if ( $bak ) {
1361 0           unlink $bak;
1362             } else {
1363 0           warn "Error processing $file. BCP file in $file.bak: $@\n";
1364 0           return;
1365             }
1366             }
1367 0 0 0       if ( @opts and ( $opts[0]{Header} || $opts[0]{QuoteFields} ) ) {
      0        
1368 0           my $bak = eval { $self->add_header($table, $file, @opts) };
  0            
1369 0 0         if ( $bak ) {
1370 0           unlink $bak;
1371             } else {
1372 0           warn "Error post processing $file. BCP file in $file.bak: $@\n";
1373 0           return;
1374             }
1375             }
1376 0           return $rows;
1377             }
1378              
1379             sub mk_view {
1380              
1381 0     0     my ($self,$table) = @_;
1382 0           my @opts;
1383 0 0         @opts = pop @_ if ref $_[-1];
1384 0 0 0       my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb';
1385              
1386             # Sybase rounds money columns, need to bcp a view of it
1387             # if any exist.
1388 0           my $dbh = $self->{DBH};
1389              
1390 0           my $col_info = $self->column_info($table);
1391              
1392             # Columns might be a string from a SELECT clause
1393             # Or it might be an arrayref of columns
1394             my $col_list = ( @opts && $opts[0]{Columns} )
1395             ? $opts[0]{Columns}
1396 0 0 0       : $col_info->{LIST};
1397              
1398 0           my $col_map = $col_info->{MAP};
1399              
1400 0           my @columns;
1401 0           my $money_cnt = 0;
1402              
1403 0           my $column_str;
1404 0 0         if ( ref $col_list ) {
1405 0           for my $name (@$col_list) {
1406 0           my $col_name = $name;
1407 0 0         if ( my $info = $col_map->{$name} ) {
1408 0           my $type = $info->{TYPE_NAME};
1409 0           $col_name = $info->{COLUMN_NAME};
1410 0 0         if ($type =~ /money/) {
1411 0           $money_cnt++;
1412 0 0         my $len = ($type =~ /small/) ? 10 : 19;
1413 0           $col_name = "convert(decimal($len,4), $col_name) $col_name";
1414             }
1415             }
1416 0           push @columns, $col_name;
1417             }
1418 0           $column_str = join ",", @columns;
1419             } else {
1420 0           $column_str = $col_list;
1421             }
1422              
1423 0 0 0       return if $money_cnt==0 and !$opts[0]{Filter} and !$opts[0]{Columns};
      0        
1424              
1425 0           my ($view, $db_view);
1426              
1427 0           my $curr_db = $self->curr_db();
1428 0 0 0       if ( !$curr_db and $table =~ /^(\w+)\.\w*\.\w+$/ ) {
1429 0           $curr_db = $1;
1430             }
1431 0 0         confess "Can not determine database" unless $curr_db;
1432              
1433 0 0 0       my $base_table =
    0          
    0          
1434             (!$curr_db or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1435             : ($table =~ /^\w+$/) ? "$curr_db..$table"
1436             : ($table =~ /^\w*\.\w+$/) ? "$curr_db.$table"
1437             : confess "Can not determine database for view";
1438              
1439 0           ( my $tmp_view = $base_table ) =~ s/.*\.//;
1440 0 0         $tmp_view = substr($tmp_view, 0, 19) if length($tmp_view) > 19;
1441              
1442 0 0         $dbh->do("USE $scratchdb") unless $self->is_iq();
1443              
1444 0           my $cnt;
1445 0           while (1) {
1446 0           my ($sec, $min, $hr) = localtime;
1447 0           my $id = sprintf("%05d%02d%02d%02d", $$, $hr, $min, $sec);
1448              
1449 0           $view = "${tmp_view}${id}";
1450 0 0         $db_view = $self->is_iq() ? $view : "$scratchdb..$view";
1451 0           my $sql = sprintf(
1452             "CREATE VIEW %s AS SELECT %s FROM %s",
1453             $view,
1454             $column_str,
1455             $base_table,
1456             );
1457 0 0 0       $sql .= " $opts[0]{Filter}" if @opts && $opts[0]{Filter};
1458 0           print "Creating view $db_view\n";
1459 0           print "Executing: $sql\n";
1460 0           my $result = eval { $dbh->do($sql) };
  0            
1461 0 0         return $view if $result;
1462 0 0         confess $@ unless $@ =~ /already an object/;
1463 0           $cnt++;
1464 0 0         confess "Too many retries trying to create view $db_view. Aborting"
1465             if $cnt > 20;
1466 0           print "View $db_view already exists, retrying #$cnt...";
1467 0           sleep 2;
1468             }
1469              
1470             }
1471              
1472             # Fix native date format from Sybase bcp out
1473             { my %mons = qw( Jan 1 Feb 2 Mar 3 Apr 4 May 5 Jun 6 Jul 7 Aug 8 Sep 9 Oct 10 Nov 11 Dec 12 );
1474             my $mon_str = join '|', keys %mons;
1475             my $mon_re = qr/$mon_str/;
1476              
1477             sub fix_bcp_file {
1478 0     0     my ( $self, $file ) = @_;
1479 0           my $opts = {};
1480 0 0         if (ref $_[-1]) {
1481 0           $opts = pop @_;
1482             }
1483 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER} || '|';
1484 0           my $dre = quotemeta($delimiter);
1485 0           local ($_, $., $ARGV, *ARGV);
1486 0           local ( $^I, @ARGV ) = ( '.bak', $file );
1487 0   0       local $/ = $opts->{RowDelimiter} || $/;
1488 0           while ( <> ) {
1489 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d):(\d\d):(\d{3})([AP])M($dre|$/)!
1490             $1 .
1491             sprintf( '%04d-%02d-%02d %02d:%02d:%02d.%03d',
1492             $4,
1493 0 0 0       $mons{ $2 },
    0 0        
1494             $3,
1495             ( $9 eq 'P' && $5 < 12) ? $5 + 12 : ( $9 eq 'A' && $5 == 12 ) ? 0 : $5,
1496             $6,
1497             $7,
1498             $8 ) .
1499             $10
1500             !eg;
1501 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d)([AP])M($dre|$/)!
1502             $1 .
1503             sprintf( '%04d-%02d-%02d %02d:%02d',
1504             $4,
1505 0 0 0       $mons{ $2 },
    0 0        
1506             $3,
1507             ( $7 eq 'P' && $5 < 12) ? $5 + 12 : ( $7 eq 'A' && $5 == 12 ) ? 0 : $5,
1508             $6 ) .
1509             $8
1510             !eg;
1511 0           1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})($dre|$/)!
1512             $1 .
1513             sprintf( '%04d-%02d-%02d',
1514             $4,
1515 0           $mons{ $2 },
1516             $3 ) .
1517             $5
1518             !eg;
1519 0           print;
1520             }
1521 0           return "$file.bak";
1522             }
1523             }
1524              
1525             {
1526             my %type_map = ( 'V' => 'V', 'P' => 'P', 'U' => 'T' );
1527              
1528             sub obj_type {
1529 0     0     my ( $self, $name ) = @_;
1530 0           my $dbh = $self->{DBH};
1531 0           my $qname = $dbh->quote($name);
1532 0           my ( $type ) = $dbh->selectrow_array("select type from sysobjects where name = $qname");
1533 0 0         return unless $type;
1534 0   0       return $type_map{$type} || confess "Don't know about type $type for object $name";
1535             }
1536             }
1537              
1538             sub curr_db {
1539 0     0     my $self = shift;
1540              
1541 0           $self->get('db_name()');
1542             }
1543              
1544 0     0     sub curr_schema { undef }
1545              
1546             {
1547              
1548             # Can get errors in some databases if you don't add dbo to everything
1549             my $sql_t = <
1550             SELECT
1551             dbo.sysindexes.name,
1552             index_col(object_name(dbo.sysindexes.id), dbo.sysindexes.indid, dbo.syscolumns.colid) col_name
1553             FROM dbo.sysindexes, dbo.syscolumns
1554             WHERE dbo.sysindexes.id = dbo.syscolumns.id
1555             AND dbo.syscolumns.colid <= dbo.sysindexes.keycnt
1556             AND dbo.sysindexes.id = object_id(%s)
1557             SQL
1558              
1559             sub index_info {
1560 0     0     my ( $self, $table, $all_indexes ) = @_;
1561              
1562 0           my ($tmp_db, $curr_db) = (undef,'');
1563 0           my $dbh = $self->{DBH};
1564              
1565 0           my $schema = '';
1566 0           $tmp_db = $curr_db = $self->curr_db();
1567              
1568 0 0         if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) {
1569 0           ($tmp_db, $schema, $table) = ($1,$2,$3);
1570 0           $table = "$schema.$table";
1571              
1572             # We can only get info on the current database
1573 0 0 0       if ( defined($tmp_db) and $tmp_db ne $curr_db ) {
1574 0           $dbh->do("USE $tmp_db");
1575             }
1576             }
1577              
1578 0           my $sql = sprintf $sql_t, $dbh->quote($table);
1579 0 0         $sql .= "AND dbo.sysindexes.status & 2 = 2\n" unless $all_indexes;
1580 0           $sql .= "ORDER BY dbo.syscolumns.colid\n";
1581 0           my $sth = $dbh->prepare($sql);
1582 0           $sth->execute();
1583 0           my @col_names = @{$sth->{NAME_lc}};
  0            
1584 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
1585 0           my %ind;
1586 0           while ($sth->fetch()) {
1587 0 0         if ( $row{col_name} ) {
1588 0           push @{$ind{$row{name}}}, lc($row{col_name});
  0            
1589             }
1590             }
1591              
1592 0 0 0       $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1593              
1594 0 0         return unless %ind;
1595 0           return \%ind;
1596             }
1597              
1598             }
1599              
1600             sub primary_key {
1601 0     0     my ( $self, $table ) = @_;
1602 0           my $schema;
1603 0           my ($tmp_db, $curr_db) = (undef,'');
1604 0           my $dbh = $self->{DBH};
1605              
1606 0           $tmp_db = $curr_db = $self->curr_db();
1607              
1608 0 0         if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) {
1609 0           ($tmp_db, $schema, $table) = ($1,$2,$3);
1610 0   0       $schema ||= undef;
1611              
1612             # We can only get column info on the current database
1613 0 0 0       $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1614             }
1615              
1616 0           my @pk = $self->{DBH}->primary_key($tmp_db, $schema, $table);
1617              
1618 0 0 0       $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db;
1619              
1620 0 0         return unless @pk;
1621              
1622 0           return \@pk;
1623             }
1624              
1625             {
1626              
1627             my $del_sql = <
1628             DELETE %s
1629             FROM %s d, %s s
1630             WHERE %s
1631             SQL
1632              
1633             my $ins_sql = <
1634             SELECT %s
1635             FROM %s
1636             SQL
1637              
1638             sub merge {
1639 0     0     my $self = shift;
1640 0           my %args = @_;
1641 0           my $dbh = $self->{DBH};
1642              
1643 0           my $table = lc($args{Table});
1644 0           my $stg_table = lc($args{StgTable});
1645              
1646 0           my $tbl_info = $self->column_info($table);
1647 0           my $tbl_map = $tbl_info->{MAP};
1648              
1649 0           my $stg_info = $self->column_info($stg_table);
1650 0           my $stg_map = $stg_info->{MAP};
1651              
1652 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
1653              
1654 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
1655 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table, $key_col_ref);
1656              
1657 0           my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref;
1658 0           my %is_key_col;
1659 0           $is_key_col{$_}++ for map lc, @$key_col_ref;
1660 0           my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref;
1661 0           my %is_upd_col;
1662 0           $is_upd_col{$_}++ for map lc, @$upd_col_ref;
1663              
1664 0           my %tmp_col_map;
1665 0 0         %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap};
  0            
1666              
1667             # Column map for upd statement, which must map the correct case
1668             # to the correct case.
1669 0           my %col_map = map {( $_ => (
1670             $tmp_col_map{lc($_)}
1671             ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME}
1672             : $stg_map->{lc($_)}{COLUMN_NAME}
1673 0 0         ))} @key_cols, @upd_cols;
1674              
1675             # Correctly cased field list for bcp select from stage table statement
1676             # Either it's in the explicit column map, or it's a key or upd column
1677             # with the same name as the target table,
1678             # or it can be last_chg_user or date
1679             my @fields = map {
1680             ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()'
1681             : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()'
1682             : $tmp_col_map{$_} ? $stg_has{$tmp_col_map{$_}} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME} : $tmp_col_map{$_}
1683             : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_has{$_} ? $stg_map->{$_}{COLUMN_NAME} : ()
1684             : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME}
1685 0 0 0       : confess "Failed to map target column $table.$_"
    0 0        
    0 0        
    0          
    0          
    0          
    0          
1686 0           } @{$tbl_info->{LIST}};
  0            
1687 0           my $field_str = join(",", @fields);
1688              
1689 0   0       my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols);
1690              
1691 0           my $del_merge_sql = sprintf($del_sql,
1692             $table,
1693             $table, $stg_table,
1694             $key_col_str,
1695             );
1696 0           print("Executing: $del_merge_sql\n");
1697              
1698 0 0         unless ($args{NoExec}) {
1699 0           my $del_rows = $dbh->do($del_merge_sql) + 0;
1700 0           print("$del_rows rows deleted from $table\n\n");
1701             }
1702              
1703 0           my $ins_merge_sql = sprintf($ins_sql,
1704             $field_str,
1705             $stg_table,
1706             );
1707 0           print("Inserting to $table: $ins_merge_sql\n");
1708              
1709 0 0         return 1 if $args{NoExec};
1710              
1711 0 0 0       my $ins_rows = ( $args{NoBCP} or ($stg_table =~ /^#/) )
1712             ? $dbh->do("INSERT INTO $table\n$ins_merge_sql") + 0
1713             : $self->bcp_sql($table, $ins_merge_sql) + 0;
1714 0           print("$ins_rows rows inserted to $table\n\n");
1715              
1716 0           return 1;
1717             }
1718             }
1719              
1720             # This merge is destructive to the staging table
1721             # Only 'new' rows will be left in the staging table
1722             {
1723              
1724             my $upd_sql = <
1725             UPDATE %s
1726             SET %s
1727             FROM %s d,%s s
1728             WHERE %s
1729             SQL
1730              
1731             my $del_sql = <
1732             DELETE %s
1733             FROM %s s, %s d
1734             WHERE %s
1735             SQL
1736              
1737             my $ins_sql = <
1738             SELECT %s
1739             FROM %s
1740             SQL
1741              
1742             sub merge2 {
1743 0     0     my $self = shift;
1744 0           my %args = @_;
1745 0           my $dbh = $self->{DBH};
1746              
1747 0           my $table = lc($args{Table});
1748 0           my $stg_table = lc($args{StgTable});
1749              
1750 0           my $tbl_info = $self->column_info($table);
1751 0           my $tbl_map = $tbl_info->{MAP};
1752              
1753 0           my $stg_info = $self->column_info($stg_table);
1754 0           my $stg_map = $stg_info->{MAP};
1755 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
1756              
1757 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
1758 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table);
1759              
1760 0           my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref;
1761 0           my %is_key_col;
1762 0           $is_key_col{$_}++ for map lc, @$key_col_ref;
1763 0           my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref;
1764 0           my %is_upd_col;
1765 0           $is_upd_col{$_}++ for map lc, @$upd_col_ref;
1766              
1767 0           my %tmp_col_map;
1768 0 0         %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap};
  0            
1769              
1770             # Column map for upd statement, which must map the correct case
1771             # to the correct case.
1772 0           my %col_map = map {( $_ => (
1773             $tmp_col_map{lc($_)}
1774             ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME}
1775             : $stg_map->{lc($_)}{COLUMN_NAME}
1776 0 0         ))} @key_cols, @upd_cols;
1777              
1778             # Correctly cased field list for bcp select from stage table statement
1779             # Either it's in the explicit column map, or it's a key or upd column
1780             # with the same name as the target table,
1781             # or it can be last_chg_user or date
1782             my @fields = map {
1783             ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()'
1784             : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()'
1785             : $tmp_col_map{$_} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME}
1786             : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_map->{$_}{COLUMN_NAME}
1787             : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME}
1788 0 0 0       : confess "Failed to map target column $table.$_"
    0 0        
    0 0        
    0          
    0          
1789 0           } @{$tbl_info->{LIST}};
  0            
1790 0           my $field_str = join(",", @fields);
1791              
1792 0   0       my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols);
1793 0   0       my $upd_col_str = join(",", map "$_=s.".($col_map{$_}||$_), @upd_cols);
1794              
1795             # Determine if last_chg_user, last_chg_date need to be updated
1796 0           my %chg_col = $self->last_chg_list($table, \@fields);
1797 0           for my $col ( sort { $b cmp $a } keys %chg_col ) {
  0            
1798 0 0         $upd_col_str .= ",$col=".( ($col eq 'last_chg_user') ? 'suser_name()' : 'getdate()');
1799             }
1800              
1801 0 0         unless ($args{InsertOnly}) {
1802 0           my $upd_merge_sql = sprintf($upd_sql,
1803             $table,
1804             $upd_col_str,
1805             $table, $stg_table,
1806             $key_col_str,
1807             );
1808 0           print("Executing: $upd_merge_sql\n");
1809              
1810 0 0         unless ($args{NoExec}) {
1811 0           my $upd_rows = $dbh->do($upd_merge_sql) + 0;
1812 0           print("$upd_rows rows updated in $table\n\n");
1813             }
1814             }
1815              
1816 0           my $del_merge_sql = sprintf($del_sql,
1817             $stg_table,
1818             $stg_table, $table,
1819             $key_col_str,
1820             );
1821 0           print("Executing: $del_merge_sql\n");
1822              
1823 0 0         unless ($args{NoExec}) {
1824 0           my $del_rows = $dbh->do($del_merge_sql) + 0;
1825 0           print("$del_rows rows deleted from $stg_table\n\n");
1826             }
1827              
1828 0           my $ins_merge_sql = sprintf($ins_sql,
1829             $field_str,
1830             $stg_table,
1831             );
1832 0           print("Inserting to $table: $ins_merge_sql\n");
1833              
1834 0 0         return 1 if $args{NoExec};
1835 0           my $ins_rows = $self->bcp_sql($table, $ins_merge_sql) + 0;
1836 0           print("$ins_rows rows inserted to $table\n\n");
1837              
1838 0           return 1;
1839             }
1840             }
1841              
1842             # BCP (via sqsh) the results of a sql select statement into a table
1843             sub bcp_sql {
1844 0     0     my $self = shift;
1845 0           my ($table,$sql) = @_;
1846              
1847 0           my $dbh = $self->{DBH};
1848 0           my $db = $dbh->{Name};
1849 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
1850 0           my $server = $1;
1851 0           my $database = $self->curr_db();
1852              
1853 0           my $user = $dbh->{Username};
1854 0 0 0       my $bcp_table =
    0          
    0          
1855             (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table
1856             : ($table =~ /^\w+$/) ? "$database..$table"
1857             : ($table =~ /^\w*\.\w+$/) ? "$database.$table"
1858             : confess "Can not determine database for sqsh/bcp";
1859              
1860 0           local $ENV{SQSH} = "-U $dbh->{Username} -P $self->{PASSWORD}";
1861 0           my $pid = open(my $fh, "-|");
1862 0 0         confess "Can't fork: $!" unless defined $pid;
1863 0 0         unless ($pid) {
1864              
1865             # sqsh needs library path set - make sure it is set
1866             # Don't know where it is in generic environment, or best way
1867             # to universally set this, or even if this is necessary in general...
1868             # local $ENV{LD_LIBRARY_PATH} = '/path/to/sybase/OCS-12_5/lib';
1869 0           my @cmd = (sqsh => -S => $server, -D => $database);
1870 0           my $sqsh_fh;
1871              
1872             # sqsh outputs to stderr
1873 0           open(STDERR, ">&STDOUT");
1874 0 0         unless ( open($sqsh_fh, "|-", @cmd) ) {
1875 0           warn "Unable to exec @cmd: $!";
1876 0           exit(1);
1877             }
1878 0           print $sqsh_fh "$sql\n";
1879 0           print $sqsh_fh "\\bcp -b 1000 $bcp_table\n";
1880              
1881 0           my $status = close $sqsh_fh;
1882 0 0         exit($status ? 0 : 1);
1883             }
1884 0           my $rows;
1885 0           local ($_, $.);
1886 0           my $cnt;
1887 0           while (<$fh>) {
1888 0 0         if (/^Batch successfully bulk-copied/) {
1889 0           $cnt += 1000;
1890 0 0         print "$cnt: $_" unless $cnt % 10_000;
1891 0           next;
1892             }
1893 0           print;
1894 0 0         $rows = $1 if /^\s*(\d+) rows copied/;
1895             }
1896 0           my $close_status = close $fh;
1897 0 0         confess "SQSH BCP error - no rows copied" unless defined $rows;
1898 0 0         confess "SQSH BCP error - $rows rows copied" unless $close_status;
1899              
1900             # Return true value
1901 0           return $rows;
1902             }
1903              
1904             # SQL to return table column defaults
1905             {
1906             my $sql = <
1907             SELECT c.name, d.text
1908             FROM dbo.syscolumns c, dbo.syscomments d
1909             WHERE c.id = object_id('%s')
1910             AND c.cdefault = d.id
1911             AND d.texttype = 0
1912             SQL
1913              
1914 0     0     sub default_sql { return $sql }
1915             }
1916              
1917             # Changed for Sybase v12 and multiple tempdbs
1918             sub temp_table_name {
1919 0     0     my ($self, $name) = @_;
1920              
1921 0           my $dbh = $self->{DBH};
1922 0           my ($spid) = $dbh->selectrow_array('select @@spid');
1923 0           print "SPid: $spid\n";
1924 0           my $who = $dbh->selectrow_hashref("exec sp_who '$spid'");
1925 0   0       my $tempdb = $who->{tempdbname} || 'tempdb';
1926 0           print "TempDb: $tempdb\n";
1927 0           my ($id) = $dbh->selectrow_array("select object_id('$tempdb..$name')");
1928 0           print "ID: $id\n";
1929 0           my ($real_name) = $dbh->selectrow_array("select object_name($id, db_id('$tempdb'))");
1930 0           print "RealName: $real_name\n";
1931 0           return "$tempdb..$real_name";
1932             }
1933              
1934             sub delete {
1935 0     0     my ($self, $table, $where, $limit) = @_;
1936              
1937 0           my $dbh = $self->{DBH};
1938 0   0       $dbh->{syb_rowcount} = $limit || 1000;
1939              
1940 0           my $sql = "DELETE FROM $table";
1941 0 0         $sql .= " WHERE $where" if $where;
1942              
1943 0           my ($rows, $tot_rows);
1944 0           my ($err, $err_msg);
1945              
1946 0           print "Executing: $sql\n";
1947 0           do {
1948              
1949 0           $rows = eval { $dbh->do($sql) };
  0            
1950 0 0         unless ($rows) {
1951 0           $err_msg = $@;
1952 0           $err++;
1953 0           $rows = 0;
1954             }
1955              
1956 0           $tot_rows += $rows;
1957 0 0         print "Deleted $tot_rows rows\n" if $rows > 0;
1958              
1959             } while $rows > 0;
1960              
1961 0           $dbh->{syb_rowcount} = 0;
1962              
1963 0 0         confess $err_msg if $err;
1964              
1965 0           print "$tot_rows rows deleted from $table\n";
1966 0           return $tot_rows;
1967             }
1968              
1969              
1970             package DBIx::BulkUtil::SybaseIQ;
1971              
1972 1     1   7 use Carp qw(confess);
  1         2  
  1         58  
1973 1     1   6 use Cwd qw(abs_path);
  1         2  
  1         1234  
1974              
1975             our @ISA = qw(DBIx::BulkUtil::Sybase);
1976              
1977             {
1978              
1979             my $sql = <
1980             LOAD TABLE %s
1981             (
1982             %s
1983             )
1984             FROM
1985             %s
1986             QUOTES OFF
1987             ESCAPES OFF
1988             SQL
1989              
1990             sub bcp_in {
1991 0     0     my $self = shift;
1992 0           my $table = shift;
1993              
1994 0 0         my $opts = (ref $_[-1]) ? pop @_ : {};
1995              
1996 0           my @files = @_;
1997              
1998 0 0         push @files, "$table.bcp" unless @files;
1999              
2000 0           my $dbh = $self->{DBH};
2001              
2002 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
2003 0   0       my $row_delimiter = $opts->{RowDelimiter} || "\n";
2004              
2005 0           my $id_cnt;
2006 0   0       my $mode = $opts->{Action} || "A";
2007 0 0         if ( $mode eq 'T' ) {
    0          
2008 0           my $sql = "TRUNCATE TABLE $table";
2009 0           print "Executing: $sql\n";
2010 0           $dbh->do($sql);
2011             } elsif ($mode eq 'R') {
2012 0           my $sql = "DELETE FROM $table";
2013 0           print "Executing: $sql\n";
2014 0           $dbh->do($sql);
2015             }
2016              
2017 0           my @bcp_list;
2018 0           for my $file (@files) {
2019 0 0         confess "BCP file $file does not exist" unless -f $file;
2020 0 0         unless ( -s _ ) {
2021 0           print "$file is empty. Skipping ...\n";
2022 0           next;
2023             }
2024 0           push @bcp_list, $file;
2025             }
2026              
2027 0 0         unless ( @bcp_list ) {
2028 0           print "All files are empty. Skipping bcp of $table\n";
2029              
2030             # Make any log file parsers happy
2031 0           print "0 rows copied\n";
2032 0           return 0;
2033             }
2034              
2035 0           my $info = $self->column_info($table);
2036 0 0 0       my $col_list = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? $opts->{ColumnList} : $info->{LIST};
2037 0 0         my @filler = $opts->{Filler} ? @{$opts->{Filler}} : ();
  0            
2038 0           my %is_filler;
2039 0           $is_filler{$_}++ for @filler;
2040              
2041             # Convert empty string to NULL
2042             # Should be default but we don't want to break existing apps
2043 0 0         my $null_blanks = $self->{NoBlankNull} ? ' NULL(BLANKS)' : '';
2044              
2045             # Columns that we will let default to the schema default
2046 0   0       my $dflt = $opts->{Default} || [];
2047 0           my %dflt; $dflt{$_}++ for @$dflt;
  0            
2048              
2049 0   0       my $constant = $opts->{Constants} || {};
2050              
2051 0   0       my @list = grep !defined($constant->{$_})&&!$dflt{$_}, @$col_list;
2052 0           my $last_col = $list[-1];
2053              
2054             # It is best to explicitly put the row delimiter on the last column
2055             my $load_sql = sprintf(
2056             $sql,
2057             $table,
2058             join( ",\n", map {
2059             defined($constant->{$_}) ? qq( [$_] DEFAULT '$constant->{$_}')
2060             : ( $_ ne $last_col )
2061             ? $is_filler{$_} ? qq( FILLER('$delimiter')) : qq( [$_] '$delimiter'$null_blanks)
2062             : ( $opts->{TrailingDelimiter} )
2063             ? $is_filler{$_} ? qq( FILLER('$delimiter$row_delimiter')) : qq( [$_] '$delimiter$row_delimiter'$null_blanks)
2064 0 0         : $is_filler{$_} ? qq( FILLER('$row_delimiter')) : qq( [$_] '$row_delimiter'$null_blanks)
    0          
    0          
    0          
    0          
    0          
2065             } grep !$dflt{$_}, @$col_list),
2066 0           join( ",\n ", map { "'". abs_path($_) . "'" } @bcp_list),
  0            
2067             );
2068              
2069 0 0         $load_sql .= "SKIP $opts->{Header}\n" if $opts->{Header};
2070              
2071             # '0' indicates unlimited errors to IQ, but will be skipped here since '0' is false
2072             # That's okay, '00' might work (it is 'true' and == 0).
2073 0 0         $load_sql .= "IGNORE CONSTRAINT ALL $opts->{MaxErrors}\n" if $opts->{MaxErrors};
2074              
2075 0           my $db = $dbh->{Name};
2076 0 0         $db =~ /server=(\w+)/ or confess "Can't determine server for bcp";
2077 0           my $server = $1;
2078 0           my $database = $self->curr_db();
2079              
2080 0           print "Loading $server/$database/$table\n";
2081 0           print "Executing: $load_sql\n";
2082 0           my $rows = $dbh->do($load_sql) + 0;
2083 0           print "$rows rows copied\n";
2084 0           return $rows;
2085             }
2086             }
2087              
2088             {
2089             my $sql = <
2090             SELECT cname, default_value
2091             FROM sys.syscolumns
2092             WHERE tname = '%s'
2093             AND default_value IS NOT NULL
2094             SQL
2095              
2096 0     0     sub default_sql { return $sql }
2097             }
2098              
2099             # Because SybaseIQ can not do sqsh
2100             sub bcp_sql {
2101 0     0     my ($self, $table, $sql) = @_;
2102 0           my $do_sql = "INSERT INTO $table\n$sql";
2103 0           $self->{DBH}->do("INSERT INTO $table\n$sql");
2104             }
2105              
2106 0     0     sub is_iq {1}
2107              
2108             sub index_info {
2109 0     0     my ( $self, $table, $all_indexes ) = @_;
2110              
2111 0           my $dbh = $self->{DBH};
2112              
2113 0           my $sql = "exec sp_iqindex [$table]";
2114 0           my $sth = $dbh->prepare($sql);
2115 0           $sth->execute();
2116 0           my @col_names = @{$sth->{NAME_lc}};
  0            
2117 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
2118 0           my %ind;
2119 0           while ($sth->fetch()) {
2120 0 0 0       next if !$all_indexes and $row{unique_index} ne 'Y';
2121 0           $ind{$row{index_name}} = [ split /,/, $row{column_name} ];
2122             }
2123              
2124 0 0         return unless %ind;
2125 0           return \%ind;
2126             }
2127              
2128             package DBIx::BulkUtil::Oracle;
2129              
2130 1     1   18 use Carp qw(confess);
  1         2  
  1         43  
2131 1     1   10 use Cwd qw(abs_path);
  1         2  
  1         7843  
2132              
2133             our @ISA = qw(DBIx::BulkUtil::Obj);
2134              
2135 0     0     sub now { 'systimestamp' }
2136              
2137             sub add {
2138 0     0     my $self = shift;
2139 0           my $date = shift;
2140 0           while (my ( $n, $unit ) = splice( @_, 0, 2 ) ) {
2141 0           $date .= " + numtodsinterval( $n, '$unit' )";
2142             }
2143 0           return $date;
2144             }
2145              
2146             {
2147             my %intervals = (
2148             year => '/ 365',
2149             month => '/ 30',
2150             hour => '* 24',
2151             minute => '* 24 * 60',
2152             second => '* 24 * 60 * 60',
2153             );
2154              
2155             sub diff {
2156 0     0     my $self = shift;
2157 0           my $date1 = shift;
2158 0           my $date2 = shift;
2159 0           my $unit = shift;
2160 0           my $diff_str = "$date2 - $date1";
2161 0 0         if (my $str = $intervals{$unit}) {
2162 0           $diff_str = "($diff_str) $str";
2163             }
2164 0           return "trunc($diff_str)";
2165             }
2166             }
2167              
2168             # This is necessary when you want to use a literal
2169             # date in a datetime calculation
2170             sub to_datetime {
2171 0     0     my $self = shift;
2172 0           my $date = shift;
2173              
2174 0           return "to_timestamp('$date', 'YYYY-MM-DD HH24:MI:SS.FF')";
2175             }
2176              
2177             # Don't need this with new version of DBI/DBD
2178             #sub to_char {
2179             # my $self = shift;
2180             # my $date = shift;
2181             # return "to_char($date, 'YYYY-MM-DD HH24:MI:SS')";
2182             #}
2183             #
2184             #sub fmt { return $_[1] }
2185              
2186             sub row_select {
2187 0     0     my $self = shift;
2188 0           my $sel = shift;
2189 0           return "select $sel from dual";
2190             }
2191              
2192             sub sp_sth {
2193 0     0     my $self = shift;
2194 0           my $sth = $self->{DBH}->prepare($self->sp_sql(@_));
2195 0           $sth->bind_param_inout(":cursor", \my $sth2, 0, { ora_type => DBD::Oracle::ORA_RSET() });
2196 0           $sth->execute();
2197 0           return $sth2;
2198             }
2199              
2200             sub sp_sql {
2201 0     0     my $self = shift;
2202 0           my ($stored_proc, @args) = @_;
2203             return
2204             "BEGIN\n$stored_proc(" .
2205 0 0         join(",", map { /^:cursor$/ ? $_ : $self->{DBH}->quote($_) } @args) .
  0            
2206             ");\nEND;\n";
2207             }
2208              
2209             {
2210              
2211             my %action_map = (
2212             A => "APPEND",
2213             R => "REPLACE",
2214             T => "TRUNCATE",
2215             );
2216              
2217             sub bcp_in {
2218 0     0     my $self = shift;
2219 0           my $opts = {};
2220 0 0         if (ref $_[-1]) {
2221 0           $opts = pop @_;
2222             }
2223 0   0       my $action_opt = uc($opts->{Action} || "A");
2224              
2225 0           my ( $table, @files ) = @_;
2226              
2227 0 0         my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';
2228              
2229 0           my $dbh = $self->{DBH};
2230              
2231 0           my $stdin = $opts->{Stdin};
2232 0 0 0       @files = "$table.bcp" if !@files && !$stdin;
2233              
2234 0           my $has_stdin;
2235 0           for my $file (@files) {
2236 0 0         if ( $file eq "-" ) {
2237 0           $has_stdin++;
2238 0           next;
2239             }
2240 0 0         confess "BCP file $file does not exist" unless -f $file;
2241             }
2242              
2243 0 0 0       if ( $has_stdin && !$stdin ) {
    0 0        
2244 0           $stdin = \*STDIN;
2245             } elsif ( $stdin && !$has_stdin ) {
2246 0           push @files, "-";
2247             }
2248              
2249             # Save some work, skip load on empty file
2250             # Let sqlldr do a heavy handed truncate or delete
2251             # if that is the chosen action
2252 0 0         my @bcp_files = grep { $_ eq "-" or -s } @files;
  0            
2253              
2254 0 0         if ( !@bcp_files ) {
2255 0 0         if ( $action_opt eq 'A') {
2256 0           print "$files[0],... is empty. Skipping sqlldr\n";
2257              
2258             # Make any log file parsers happy
2259 0           print "0 Rows successfully loaded\n";
2260 0           return 0;
2261             }
2262              
2263             # Need some files if we run sqlldr
2264 0           @bcp_files = @files;
2265             }
2266 0           require File::Temp;
2267              
2268 0   0       my $constants = $opts->{Constants} || {};
2269 0           my %const = map { uc($_) => $constants->{$_} } keys %$constants;
  0            
2270              
2271 0   0       my $sizes = $opts->{CharSizes} || {};
2272 0           my %char_sizes = map { uc($_) => $sizes->{$_} } keys %$sizes;
  0            
2273              
2274 0   0       my $keep_temp = $opts->{KeepTempFiles} || $opts->{Debug};
2275 0   0       my $in_temp_dir = $opts->{TempDir} || $opts->{Debug};
2276 0           my $temp_dir;
2277 0 0 0       $temp_dir = $opts->{TempDir} || "." if $in_temp_dir;
2278              
2279 0 0         my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
2280 0 0         my @unlink = $keep_temp ? (UNLINK => 0) : ();
2281 0           my $ctl_fh = File::Temp->new(
2282             TEMPLATE => "${table}_XXXXX",
2283             SUFFIX => ".ctl",
2284             @temp_dir, @unlink,
2285             );
2286 0           chmod(0664, $ctl_fh->filename());
2287 0           my $bad_fh = File::Temp->new(
2288             TEMPLATE => "${table}_XXXXX",
2289             SUFFIX => ".bad",
2290             @temp_dir, @unlink,
2291             );
2292 0           chmod(0664, $bad_fh->filename());
2293 0           my $log_fh = File::Temp->new(
2294             TEMPLATE => "${table}_XXXXX",
2295             SUFFIX => ".log",
2296             @temp_dir, @unlink,
2297             );
2298 0           chmod(0664, $log_fh->filename());
2299 0 0         my $prm_fh = $stdin ? File::Temp->new(
2300             TEMPLATE => "${table}_XXXXX",
2301             SUFFIX => ".prm",
2302             @temp_dir,
2303             ) : undef;
2304              
2305             # NLS date format env variable does not work
2306             # for sqlldr.
2307             # So we must determine date fields and
2308             # specify the format in the control file.
2309 0           my $db = $self->{DBH}->{Name};
2310 0           my $user = $dbh->{Username};
2311 0           my ($schema, $tbl_name) = split /\./, uc($table);
2312 0 0         if (!$tbl_name) {
2313 0           $tbl_name = $schema;
2314 0           $schema = $self->curr_schema();
2315             }
2316              
2317 0           my $sth = $dbh->column_info(undef, $schema, $tbl_name, undef);
2318 0           my @info_names = @{$sth->{NAME_uc}};
  0            
2319 0           my %row; $sth->bind_columns(\@row{@info_names});
  0            
2320 0           my (@columns, %is_date, %char_sz, %is_lob);
2321 0 0         print "ColumnName Type Size\n" if $opts->{Debug};
2322 0 0         print "----------------\n" if $opts->{Debug};
2323 0           while ($sth->fetch()) {
2324 0           push @columns, $row{COLUMN_NAME};
2325 0 0         print "$row{COLUMN_NAME}\t$row{TYPE_NAME}\t$row{COLUMN_SIZE}\n" if $opts->{Debug};
2326 0 0         $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : $row{COLUMN_SIZE} if $row{TYPE_NAME} =~ /CHAR/;
    0          
2327 0 0         $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : 20_000_000, $is_lob{$row{COLUMN_NAME}} = 1 if $row{TYPE_NAME} =~ /TEXT|LOB|XML/;
    0          
2328 0 0         $is_date{$row{COLUMN_NAME}} = $1 if $row{TYPE_NAME} =~ /(DATE|TIMESTAMP)/;
2329             }
2330 0 0         confess("Table $schema.$tbl_name not found in database $db") unless @columns;
2331              
2332             # Find date formats in file, remove constants from column list
2333 0           my %date_fmt;
2334             my @file_columns = grep !defined($const{$_}),
2335 0 0 0       ( ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? ( map uc, @{$opts->{ColumnList}} ) : @columns );
  0            
2336 0 0         if (%is_date) {
2337 0           %date_fmt = $self->date_masks_from_file([@files], \@file_columns, \%is_date, $opts);
2338             }
2339              
2340 0 0         my $row_delim_str = $opts->{RowDelimiter} ? qq("str '$opts->{RowDelimiter}'"\n) : '';
2341              
2342 0   0       my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
2343 0   0       my $action = $action_map{$action_opt} || "APPEND";
2344 0           my $direct_load_pre = '';
2345 0           my $direct_load_post = '';
2346              
2347 0           my $sqlldr_opts = '';
2348 0   0       my $max_errors = $opts->{MaxErrors} || 0;
2349 0           $sqlldr_opts .= "ERRORS=$max_errors";
2350 0 0         $sqlldr_opts .= ", SKIP=$opts->{Header}" if $opts->{Header};
2351              
2352 0 0         if ($opts->{DirectPath}) {
2353 0 0         my $parallel = ( uc($opts->{DirectPath}) eq 'P' ) ? ", PARALLEL=TRUE" : '';
2354 0           $direct_load_pre = "OPTIONS(DIRECT=TRUE$parallel, ROWS=1000000, $sqlldr_opts)\nUNRECOVERABLE\n";
2355 0           $direct_load_post = "REENABLE DISABLED_CONSTRAINTS\n";
2356             } else {
2357 0   0       my $commit_rows = $opts->{CommitSize} || 2000;
2358 0           $direct_load_pre = "OPTIONS (ROWS=$commit_rows, BINDSIZE=5000000, READSIZE=20970000, $sqlldr_opts)\n";
2359             }
2360             my $default_date_fmt =
2361             $opts->{SybaseDateFmt} ? 'MON DD YYYY HH12:MI:SS:FF3AM'
2362             : $opts->{DateFormat} ? $opts->{DateFormat}
2363 0 0         : 'YYYY-MM-DD HH24:MI:SS.FF3'
    0          
2364             ;
2365 0           for ( keys %is_date ) {
2366 0   0       $date_fmt{$_} ||= $default_date_fmt;
2367 0 0         $is_date{$_} = 'TIMESTAMP' if $date_fmt{$_} =~ /FF|TZ[DHMR]/;
2368             }
2369             my $quote_str = $opts->{QuoteFields}
2370 0 0         ? qq( OPTIONALLY ENCLOSED BY '"')
2371             : ''
2372             ;
2373 0 0         if ( $opts->{LoadWhen} ) {
2374 0           $direct_load_post .= "WHEN $opts->{LoadWhen}\n";
2375             }
2376              
2377 0           my $nls_str = '';
2378 0 0         $nls_str = "CHARACTERSET $opts->{NLSLang}" if $opts->{NLSLang};
2379 0 0         $nls_str .= " LENGTH SEMANTICS $opts->{Semantics}" if $opts->{Semantics};
2380 0 0         $nls_str .= "\n" if $nls_str;
2381              
2382 0           my %sybase_type;
2383 0 0         @sybase_type{@file_columns} = @{$opts->{SybaseTypes}} if $opts->{SybaseTypes};
  0            
2384             # Logic for trimming or preserving blanks on char/varchar columns
2385             my $blank_control = sub {
2386 0     0     my $size = $char_sz{$_};
2387             return " $_ CHAR($size) PRESERVE BLANKS"
2388 0 0 0       if $opts->{PreserveBlanks} or $size == 1;
2389 0 0         if ( $opts->{SybaseTypes} ) {
2390             # On the off chance a Sybase char column becomes an Oracle BLOB
2391 0 0         return qq[ $_ CHAR($size)] if $is_lob{$_};
2392 0 0         return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $sybase_type{$_} eq 'char';
2393             } else {
2394 0 0         return qq[ $_ CHAR($size)] if $is_lob{$_};
2395 0 0         return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $opts->{TrimBlanks};
2396             }
2397 0           return " $_ CHAR($size)";
2398 0           };
2399              
2400 0   0       my $field_ref = $opts->{FieldRef} || {};
2401 0           my %field_ref = map { uc($_) => $field_ref->{$_} } keys %$field_ref;
  0            
2402              
2403             # Field ref columns that don't reference themselves
2404             # will be considered similar to constant columns, but they must come
2405             # last, otherwise column alignment will be off
2406 0           my %field_ref_const;
2407 0           for ( keys %field_ref ) {
2408 0 0         next if $field_ref{$_} =~ /:$_\b/i;
2409 0           $field_ref_const{$_}++;
2410             }
2411 0           @columns = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? map uc($_), @{$opts->{ColumnList}} : (
2412 0 0 0       (grep !$field_ref_const{$_}, @columns),
2413             keys %field_ref_const,
2414             );
2415 0           my %is_filler;
2416 0 0         if ( $opts->{Filler} ) {
2417 0           $is_filler{uc($_)}++ for @{$opts->{Filler}};
  0            
2418             }
2419              
2420 0           my $file_str = join(",", @bcp_files);
2421 0           my $sqlldr_file_str = join("\n", map "INFILE '$_'", @bcp_files);
2422 0           my $disp_table = my $sqlldr_table = $table;
2423              
2424 0 0         if ($partition) {
2425 0           $sqlldr_table .= " PARTITION ($partition)";
2426 0           $disp_table .= ":$partition";
2427             }
2428              
2429             # Default charset is roman8 on HP
2430             # Must set it here
2431             printf $ctl_fh
2432             $direct_load_pre.
2433             "LOAD DATA\n".
2434             #"CHARACTERSET WE8ROMAN8\n".
2435             $nls_str.
2436             "%s\n".
2437             $row_delim_str.
2438             "INTO TABLE %s %s\n".
2439             $direct_load_post.
2440             qq(FIELDS TERMINATED BY '$delimiter'$quote_str\n).
2441             "TRAILING NULLCOLS\n".
2442             "(\n%s\n)\n",
2443             $sqlldr_file_str,
2444             $sqlldr_table,
2445             $action,
2446             join(",\n", map {
2447 0           (
2448             exists($const{$_}) ? qq[ $_ CONSTANT '$const{$_}']
2449             : exists($is_filler{$_}) ? qq[ $_ FILLER]
2450             : exists($field_ref{$_}) ? qq[ $_ "$field_ref{$_}"]
2451             : $is_date{$_} ? " $_ $is_date{$_} '$date_fmt{$_}'"
2452 0 0         : $char_sz{$_} ? $blank_control->()
    0          
    0          
    0          
    0          
2453             : " $_"
2454             )
2455             } @columns);
2456 0 0         if ($prm_fh) {
2457 0           print $prm_fh "userid=$user/$self->{PASSWORD}\@$db\n";
2458 0           close $prm_fh;
2459             }
2460 0           $_->close() for $ctl_fh, $bad_fh, $log_fh;
2461              
2462 0           print "Loading $db..$disp_table from $file_str\n";
2463              
2464 0           my $ctl_file = $ctl_fh->filename();
2465 0           my $bad_file = $bad_fh->filename();
2466 0           my $log_file = $log_fh->filename();
2467 0 0         my $prm_file = $prm_fh ? $prm_fh->filename() : undef;
2468 0 0         if ($keep_temp) {
2469 0           print "SqlldrControlFile: ", abs_path($ctl_file), "\n";
2470 0           print "SqlldrBadRowFile : ", abs_path($bad_file), "\n";
2471 0           print "SqlldrLogFile : ", abs_path($log_file), "\n";
2472             }
2473 0           local $ENV{NLS_DATE_FORMAT} = 'YYYY-MM-DD HH24:MI:SS';
2474 0           local $ENV{NLS_TIMESTAMP_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF';
2475 0           local $ENV{NLS_TIMESTAMP_TZ_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF';
2476              
2477 0           my @prm_opt;
2478 0 0         @prm_opt = "parfile=$prm_file" if $prm_file;
2479 0           my @cmd = (
2480             sqlldr =>
2481             "control=$ctl_file",
2482             "log=$log_file",
2483             "bad=$bad_file",
2484             @prm_opt,
2485             "silent=(header,discards,feedback,partitions)",
2486             );
2487 0 0 0       print "Executing: @cmd\n" if $opts->{Debug} || $opts->{NoExec};
2488 0 0         return "@cmd" if $opts->{NoExec};
2489              
2490 0           my $close_success;
2491              
2492             # We could do this either way with IPC::Run
2493             # But lets not require it unless necessary.
2494 0 0         if ($stdin) {
2495 0           require IPC::Run;
2496              
2497 0           $close_success = IPC::Run::run( \@cmd, '<', $stdin );
2498             } else {
2499             # Hide user/passwd from ps
2500 0 0         open(my $cmd_fh, "|-", @cmd) or confess "Could not exec sqlldr: $!";
2501 0           print $cmd_fh "$user/$self->{PASSWORD}\@$db\n";
2502              
2503             # We don't want to exit right away on failure
2504             # We want to see the log file and bad record if any
2505 0           $close_success = close $cmd_fh;
2506             }
2507              
2508             # We don't want to exit right away on failure
2509             # We want to see the log file and bad record if any
2510 0           my $exit_stat = $? >> 8;
2511 0           my $exit_sig = $? & 127;
2512 0           my $exit_core = $? & 128;
2513              
2514             # We have a limit of one rejected row. If we have a bad row
2515             # we'll just include it in the error.
2516             # Oops thats no longer true now that we have a MaxErrors option
2517             # Just show the first bad row if we allow > 1 error
2518 0           my $bad_row;
2519 0 0         if ( -s $bad_file ) {
2520 0 0         if ( $max_errors > 0 ) {
2521 0           local ($_, $.);
2522 0   0       local $/ = $opts->{RowDelimiter} || "\n";
2523 0 0         open(my $fh, "<", $bad_file) or confess "Can't open sqlldr reject file $bad_file: $!";
2524 0           $bad_row = <$fh>;
2525 0           close $fh;
2526             } else {
2527 0           warn "sqlldr error loading $file_str into $disp_table on row:\n";
2528 0           $bad_row = `cat $bad_file`;
2529             }
2530             }
2531 0 0         open(my $fh, "<", $log_file) or confess "Can't open sqlldr log $log_file: $!";
2532 0           print "Opened $log_file\n";
2533 0           local ($_, $.);
2534 0           my ( $rows, $error_rows, $failed_rows, $null_rows, $error_msg, $discontinued, $dp_errors );
2535 0           while (<$fh>) {
2536 0           print;
2537 0 0         if ( /^\s*(\d+)/ ) {
2538 0           my $tmp_rows = $1;
2539 0 0         $rows = $tmp_rows if /successfully loaded/;
2540 0 0         $error_rows = $tmp_rows if /not loaded due to data errors/;
2541 0 0         $failed_rows = $tmp_rows if /not loaded because all WHEN clauses/;
2542 0 0         $null_rows = $tmp_rows if /not loaded because all fields were null/;
2543 0           next;
2544             }
2545 0 0         if ( /^Record \d+: Rejected/ ) {
2546 0           $error_msg .= $_;
2547 0           next;
2548             }
2549 0 0         if ( /^(?:SQL\*Loader|ORA)-\d+:/ ) {
2550 0           $error_msg .= $_;
2551 0 0         $discontinued++ if /discontinued|aborted/;
2552 0           next;
2553             }
2554              
2555             # Catch direct path errors
2556 0 0         if ( /was not re-(?:enabled|validated)/ ) {
2557             # These errors do not cause non-zero exit status
2558 0           $dp_errors++;
2559 0           $error_msg .= $_;
2560 0           next;
2561             }
2562 0 0         if ( /^index \S+ was made unusable/ ) {
2563 0           $dp_errors++;
2564 0           $error_msg .= $_;
2565 0           next;
2566             }
2567              
2568             }
2569 0           close $fh;
2570              
2571 0 0 0       if (!$close_success or $dp_errors) {
2572 0   0       $error_msg ||= '';
2573 0 0 0       if ( $exit_stat != 0 or $dp_errors ) {
2574 0 0 0       if ( $exit_stat == 2 or $dp_errors ) {
2575             # Exit status 2 is just a warning
2576             # But we should consider it an error if we exceeded the max errors allowed
2577             # Or if load was discontinued for any reason
2578             # Or for any direct path errors
2579 0 0         my $first = ($max_errors > 0) ? 'first ' : '';
2580 0 0         confess "sqlldr exited with status $exit_stat [$error_msg]" if $dp_errors;
2581 0 0         confess "sqlldr exited with status $exit_stat [$error_msg] - ${first}rejected record:[$bad_row]" if $error_rows > $max_errors;
2582 0 0         confess "sqlldr exited with status $exit_stat [$error_msg]" if $discontinued;
2583             } else {
2584 0           confess "sqlldr exited with status $exit_stat [$error_msg]";
2585             }
2586             }
2587 0 0         confess "sqlldr received signal $exit_sig [$error_msg]" if $exit_sig > 0;
2588 0 0         confess "sqlldr coredumped [$error_msg]" if $exit_core;
2589             }
2590 0           return $rows;
2591             }
2592             }
2593              
2594             # Dummy method for compatibility with Sybase
2595       0     sub mk_view { }
2596              
2597             sub date_masks_from_file {
2598 0     0     my $self = shift;
2599 0           my ($files, $columns, $is_date, $opts) = @_;
2600              
2601 0 0 0       return unless $is_date and %$is_date;
2602              
2603 0   0       $opts ||= {};
2604              
2605 0   0       my $sample_rows = $opts->{DateSampleRows} || 1000;
2606 0   0       my $d = $opts->{Delimiter} || $self->{DELIMITER};
2607 0           my $rd = $opts->{RowDelimiter};
2608 0   0       my $year_mask = $opts->{Year2Mask} || 'YY';
2609              
2610 0           local ($., $_, $ARGV, *ARGV);
2611 0 0         local $/ = $rd if $rd;
2612 0           local @ARGV = @$files;
2613              
2614 0           my $row_cnt;
2615 0           my (%remaining, %got);
2616 0           $remaining{$_}++ for keys %$is_date;
2617 0           my %row;
2618 0           while (<>) {
2619 0 0 0       next if $opts->{Header} and $. <= $opts->{Header};
2620 0           chomp;
2621 0 0         @row{@$columns} = $opts->{QuoteFields} ? split_quoted( $_, $d ) : split /\Q$d/;
2622 0           for (keys %remaining) {
2623 0 0         if ( $row{$_} ) {
2624 0           delete $remaining{$_};
2625 0           $got{$_} = $row{$_};
2626 0 0         last if !%remaining;
2627             }
2628             }
2629              
2630             # If we haven't found values by now, give up
2631 0 0         last if ++$row_cnt >= $sample_rows;
2632             }
2633              
2634 0           my %fmt;
2635 0           $fmt{$_} = $self->date_mask($got{$_}, $year_mask) for keys %got;
2636              
2637 0           return %fmt;
2638             }
2639              
2640             # If we allow quoted fields, need to split correctly and
2641             # handle embedded quotes and delimiters
2642             sub split_quoted {
2643 0     0     my ($line,$d) = @_;
2644 0           my @result;
2645 0           while ( $line =~ s/\A("?)((?:""|.)*?)\1(\Q$d\E|\z)//s ) {
2646 0           my ( $q, $s,$got_d ) = ( $1, $2, $3 );
2647 0 0         $s =~ s/""/"/g if $q;
2648 0           push @result, $s;
2649 0 0         last if length($got_d) == 0;
2650             }
2651 0           return @result;
2652             }
2653              
2654             {
2655              
2656             my @mon = qw( Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec );
2657             my $mon_str = join("|", @mon);
2658             my $mon_re = qr/(?i)$mon_str/;
2659             my @months = qw( January February March April May June July August September October November December );
2660             my $month_str = join("|", @months);
2661             my $month_re = qr/(?i)$month_str/;
2662             my @days = qw( Mon Tue Wed Thu Fri Sat Sun );
2663             my $day_str = join("|", @days);
2664             my $day_re = qr/(?i)$day_str/;
2665              
2666             sub date_mask {
2667 0     0     my ($self, $str, $year2mask) = @_;
2668 0 0         return unless $str;
2669 0           local $_ = $str;
2670              
2671 0           my $fmt = '';
2672 0   0       $year2mask ||= 'YY';
2673              
2674             # YYYY-MM-DD or YYYYMMDD
2675 0 0         if ( s/^\d{4}(\D?)\d\d(\D?)\d\d// ) {
2676 0           $fmt .= "YYYY${1}MM${2}DD";
2677 0           $fmt .= time_mask();
2678             #die "Can not determine date mask for $str ($fmt)" if length($_);
2679 0 0         return if length($_);
2680 0           return $fmt;
2681             }
2682              
2683             # Allow day abbreviation (Mon Tue etc.)
2684 0 0         $fmt .= "DY " if s/^$day_re\s+//;
2685              
2686             # Jan 23 2010
2687 0 0         if ( s/^$mon_re\s+\d+// ) {
2688 0           my $end_year;
2689 0           $fmt .= "MON DD";
2690 0 0         if ( s/^\s\d{4}// ) {
    0          
2691 0           $fmt .= " YYYY";
2692             } elsif ( s/\s+\d{4}$// ) {
2693 0           $end_year++;
2694             } else {
2695             #die "Can not determine date mask for $str ($fmt)";
2696 0           return;
2697             }
2698 0           $fmt .= time_mask();
2699              
2700             #die "Can not determine date mask for $str ($fmt)" if length($_);
2701 0 0         return if length($_);
2702 0 0         $fmt .= " YYYY" if $end_year;
2703 0           return $fmt;
2704             }
2705              
2706             # January 23, 2010
2707 0 0         if ( s/^$month_re\s+\d+// ) {
2708 0           my $end_year;
2709 0           $fmt .= "MONTH DD";
2710 0 0         if ( s/^(\W?)\s\d{4}// ) {
    0          
2711 0           my $comma = $1;
2712 0           $fmt .= "$comma YYYY";
2713             } elsif ( s/\s+\d{4}$// ) {
2714 0           $end_year++;
2715             } else {
2716             #die "Can not determine date mask for $str ($fmt)";
2717 0           return;
2718             }
2719 0           $fmt .= time_mask();
2720              
2721             #die "Can not determine date mask for $str ($fmt)" if length($_);
2722 0 0         return if length($_);
2723 0 0         $fmt .= " YYYY" if $end_year;
2724 0           return $fmt;
2725             }
2726              
2727             # 02-Jan-2010
2728 0 0         if ( s/^\d\d?(\D?)$mon_re(\D?)\d{4}// ) {
2729 0           $fmt .= "DD${1}MON${2}YYYY";
2730 0           $fmt .= time_mask();
2731             #die "Can not determine date mask for $str ($fmt)" if length($_);
2732 0 0         return if length($_);
2733 0           return $fmt;
2734             }
2735              
2736             # 02-Jan-10
2737 0 0         if ( s/^\d\d?(\D?)$mon_re(\D?)\d\d?// ) {
2738 0           $fmt .= "DD${1}MON${2}$year2mask";
2739 0           $fmt .= time_mask();
2740             #die "Can not determine date mask for $str ($fmt)" if length($_);
2741 0 0         return if length($_);
2742 0           return $fmt;
2743             }
2744              
2745             # MM/DD/YYYY
2746 0 0         if ( s|^\d\d?(\D)\d\d?(\D)\d{4}|| ) {
2747 0           $fmt .= "MM${1}DD${2}YYYY";
2748 0           $fmt .= time_mask();
2749             #die "Can not determine date mask for $str ($fmt)" if length($_);
2750 0 0         return if length($_);
2751 0           return $fmt;
2752             }
2753              
2754             #die "Failure to determine date mask for $str";
2755 0           return;
2756             }
2757             }
2758              
2759             # Operates on and modifies current $_
2760             sub time_mask {
2761 0     0     my $fmt = '';
2762 0 0         if ( s/^(\D?)[\s\d]\d// ) {
2763 0           my $sep = $1;
2764 0 0         $sep = qq("$sep") if $sep =~ /\S/;
2765 0           $fmt .= "${sep}HH";
2766 0 0         $fmt .= /[AP]M\b/i ? "12" : "24";
2767 0 0         if ( s/^(\D)\d\d// ) {
2768 0           $fmt .= "${1}MI";
2769 0 0         if ( s/^(\D)\d\d// ) {
2770 0           $fmt .= "${1}SS";
2771 0 0         if ( s/^(\D)(\d+)// ) {
2772 0           $fmt .= $1 . "FF" . length($2);
2773             }
2774             }
2775             }
2776 0 0         if ( s/^(\s?)[AP]M// ) {
2777 0           $fmt .= "${1}AM";
2778             }
2779 0 0         if ( s/^(\s*)\w{2,3}T//i ) {
2780 0           $fmt .= "${1}TZD";
2781             }
2782 0 0         if ( s/^\s[+-]\d\d(\D)\d\d// ) {
2783 0           $fmt .= " TZH${1}TZM";
2784             }
2785             }
2786 0           return $fmt;
2787             }
2788              
2789             {
2790             my %type_map = ( TABLE => 'T', VIEW => 'V', PROCEDURE => 'P' );
2791              
2792             sub obj_type {
2793 0     0     my ( $self, $name ) = @_;
2794 0           $name = uc($name);
2795 0           my $type;
2796 0 0         if ( $name =~ /^([^.]+)\.(.+)/ ) {
2797 0           my ($schema, $table) = ($1, $2);
2798             $type = $self->{DBH}->selectrow_array(
2799 0           "select object_type from all_objects where owner = ? and object_name = ?",
2800             undef,
2801             $schema,
2802             $table,
2803             );
2804             } else {
2805             $type = $self->{DBH}->selectrow_array(
2806 0           "select object_type from user_objects where object_name = ?",
2807             undef,
2808             $name
2809             );
2810             }
2811 0 0         return unless $type;
2812 0   0       return $type_map{$type} || confess "Don't know about type $type for object $name";
2813             }
2814             }
2815              
2816             sub curr_schema {
2817 0     0     my $self = shift;
2818 0           return $self->get("sys_context('USERENV', 'SESSION_SCHEMA')");
2819             }
2820              
2821             {
2822             my $sql_t = <
2823             SELECT
2824             b.index_name,
2825             b.column_name
2826             FROM all_indexes a, all_ind_columns b
2827             WHERE a.owner = b.index_owner
2828             AND a.index_name = b.index_name
2829             AND a.table_owner = %s
2830             AND a.table_name = %s
2831             SQL
2832              
2833              
2834             sub index_info {
2835 0     0     my ( $self, $table, $all_indexes ) = @_;
2836              
2837 0           my $dbh = $self->{DBH};
2838 0           my ( $schema, $tbl ) = split /\./, uc($table);
2839 0 0         if ( !$tbl ) {
2840 0           $tbl = $schema;
2841 0           $schema = $self->curr_schema();
2842             }
2843 0           my $sql = sprintf $sql_t, $dbh->quote($schema), $dbh->quote($tbl);
2844 0 0         $sql .= "and a.uniqueness = 'UNIQUE'\n" unless $all_indexes;
2845 0           $sql .= "ORDER BY b.column_position\n";
2846 0           my $sth = $dbh->prepare($sql);
2847 0           $sth->execute();
2848 0           my @col_names = @{$sth->{NAME_lc}};
  0            
2849 0           my %row; $sth->bind_columns(\@row{@col_names});
  0            
2850 0           my %ind;
2851 0           while ($sth->fetch()) {
2852 0           push @{$ind{$row{index_name}}}, lc($row{column_name});
  0            
2853             }
2854 0 0         return unless %ind;
2855 0           return \%ind;
2856             }
2857             }
2858              
2859             sub primary_key {
2860 0     0     my ($self, $table) = @_;
2861 0           $table = uc($table);
2862 0           my ($schema, $tbl) = split /\./, $table;
2863 0 0         if ( !$tbl ) {
2864 0           $tbl = $schema;
2865 0           $schema = $self->curr_schema();
2866             }
2867 0           my @pk = map lc, $self->{DBH}->primary_key(undef, $schema, $tbl);
2868 0 0         return unless @pk;
2869 0           return \@pk;
2870             }
2871              
2872             {
2873              
2874             my $sql = <
2875             MERGE %s INTO %s d
2876             USING %s s
2877             ON (%s)
2878             WHEN MATCHED THEN UPDATE SET %s
2879             WHEN NOT MATCHED THEN INSERT (%s)
2880             VALUES (%s)
2881             SQL
2882              
2883             sub merge {
2884 0     0     my $self = shift;
2885 0           my %args = @_;
2886              
2887 0           my $dbh = $self->{DBH};
2888 0           my $table = $args{Table};
2889 0           my $stg_table = $args{StgTable};
2890              
2891 0           my $stg_info = $self->column_info($stg_table);
2892 0           my $stg_map = $stg_info->{MAP};
2893 0           my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}};
  0            
  0            
2894              
2895 0 0 0       my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table);
2896 0 0 0       my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table);
2897              
2898             # Normalize all columns and maps to lowercase
2899 0           my @key_cols = map lc, @$key_col_ref;
2900 0           my @upd_cols = map lc, @$upd_col_ref;
2901 0           my @fields = (@key_cols, @upd_cols);
2902             my %col_map = $args{ColMap}
2903 0 0         ? map lc, %{$args{ColMap}}
  0            
2904             : ();
2905              
2906             my $upd_col_str = join(",", map {
2907 0           $col_map{$_} ? $stg_has{$col_map{$_}} ? "d.$_=s.$col_map{$_}" : "d.$_=$col_map{$_}"
2908 0 0         : $stg_has{$_} ? "d.$_=s.$_" : ()
    0          
    0          
2909             } @upd_cols),
2910              
2911             # Determine if last_chg_user, last_chg_date need to be updated
2912             # If staging table does not have the columns, and the target table does
2913             # Then default the values
2914             my %chg_col = $self->last_chg_list($table, \@fields);
2915 0           delete $chg_col{$_} for grep $stg_has{$_}, qw(last_chg_user last_chg_date);
2916 0           for my $col ( sort { $b cmp $a } keys %chg_col ) {
  0            
2917             $upd_col_str .= ",$col=".( ($col eq 'last_chg_user')
2918 0 0         ? "'".uc(substr($dbh->{Username}, 0, $chg_col{$col}))."'"
2919             : 'SYSTIMESTAMP'
2920             );
2921             }
2922              
2923 0 0         my $parallel = $args{Parallel} ? '/* parallel(8) append */' : '';
2924              
2925             my $merge_sql = sprintf($sql,
2926             $parallel,
2927             $table,
2928             $args{MergeFilter} ? "$args{StgTable} WHERE $args{MergeFilter}": $args{StgTable},
2929             join(" AND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols),
2930             $upd_col_str,
2931             join(",", @fields),
2932             join(",", map {
2933 0 0 0       $col_map{$_} ? $stg_has{$col_map{$_}} ? "s.$col_map{$_}" : $col_map{$_}
2934 0 0         : $stg_has{$_} ? "s.$_" : "NULL"
    0          
    0          
2935             } @fields),
2936             );
2937              
2938             # No update if no update columns
2939 0 0         $merge_sql =~ s/^WHEN MATCHED.*\n//m unless @upd_cols;
2940 0           print("Executing: $merge_sql\n");
2941 0 0         return 1 if $args{NoExec};
2942              
2943 0 0         $dbh->do("ALTER SESSION ENABLE PARALLEL DML") if $args{Parallel};
2944              
2945 0           my $rows = $dbh->do($merge_sql) + 0;
2946 0           print("$rows rows updated/inserted\n\n");
2947 0           return $rows;
2948              
2949             }
2950             }
2951              
2952             # #!!!UNFINISHED!!!
2953             # Static block for mk_ext_table
2954             {
2955              
2956             my $sql = <
2957             CREATE TABLE %s (
2958             %s
2959             )
2960             ORGANIZATION EXTERNAL (
2961             TYPE oracle_loader
2962             DEFAULT DIRECTORY %s
2963             ACCESS PARAMETERS
2964             (
2965             RECORDS DELIMITED BY NEWLINE
2966             LOGFILE 'TEST.log'
2967             FIELDS TERMINATED BY '%s'
2968             )
2969             LOCATION ('%s')
2970             )
2971             SQL
2972              
2973             sub mk_ext_table {
2974 0     0     my $self = shift;
2975              
2976 0           my %args = @_;
2977              
2978 0 0         my $table = $args{Table} or confess "Need table prototype for external table";
2979              
2980 0   0       my $ext_table = $args{Name} || "ext_${table}$$";
2981 0 0         my $dir = $args{Dir} or confess "Need directory for external table $table";
2982 0 0         my $file = $args{File} or confess "Need file for external table $table";
2983              
2984 0           my $cols = $self->column_info($table);
2985 0           my $cmap = $cols->{MAP};
2986              
2987 0           my @col_list;
2988 0           for my $col (@{$cols->{LIST}}) {
  0            
2989 0           my $col_str = $col;
2990              
2991 0           my $cdata = $cmap->{$col};
2992 0           my $type = $cdata->{TYPE_NAME};
2993 0           my $dec = $cdata->{DECIMAL_DIGITS};
2994              
2995 0           $col_str .= " $type";
2996 0           my $size = $cdata->{COLUMN_SIZE};
2997              
2998 0           for ($type) {
2999 0 0         $col_str .=
    0          
    0          
3000             /CHAR/ ? "($size)"
3001             : /NUMBER/ ? (defined $dec) ? "($size,$dec)" : ''
3002             : '';
3003             }
3004              
3005             #$col_str .= " DEFAULT $cdata->{COLUMN_DEF}" if defined $cdata->{COLUMN_DEF};
3006             #$col_str =~ s/\s+$//;
3007             #$col_str .= " NOT NULL" unless $cdata->{NULLABLE};
3008              
3009 0           push @col_list, $col_str;
3010             }
3011              
3012             my $create_sql = sprintf($sql,
3013             $ext_table,
3014             join(",\n", @col_list ),
3015             $dir,
3016             #$args{RowDelimiter} || "\\n",
3017 0   0       $args{Delimiter} || "|",
3018             $file,
3019             );
3020              
3021 0           $self->{DBH}->do($create_sql);
3022              
3023 0           return $ext_table;
3024             }
3025             }
3026              
3027             package DBIx::BulkUtil::Release;
3028              
3029             sub new {
3030 0     0     my ($class, $f) = @_;
3031 0           bless $f, $class;
3032             }
3033              
3034 0     0     sub DESTROY { $_[0]->() }
3035              
3036             1;
3037              
3038             __END__