File Coverage

blib/lib/Dimedis/SqlDriver/Pg.pm
Criterion Covered Total %
statement 21 235 8.9
branch 0 114 0.0
condition 0 20 0.0
subroutine 7 22 31.8
pod 0 15 0.0
total 28 406 6.9


line stmt bran cond sub pod time code
1             package Dimedis::SqlDriver::Pg;
2              
3 1     1   932 use strict;
  1         3  
  1         55  
4 1     1   6 use vars qw($VERSION @ISA);
  1         2  
  1         72  
5              
6             $VERSION = '0.02';
7             @ISA = qw(Dimedis::Sql);
8              
9 1     1   6 use Carp;
  1         1  
  1         71  
10 1     1   6 use File::Copy;
  1         2  
  1         49  
11 1     1   6 use File::Basename;
  1         2  
  1         69  
12 1     1   6 use FileHandle;
  1         2  
  1         9  
13              
14             my $exc = "Dimedis::SqlDriver::Pg:"; # Exception Prefix
15              
16             # offizielles Dimedis::SqlDriver Interface ===========================
17              
18             # insert -------------------------------------------------------------
19              
20             sub db_insert {
21 0     0 0   my $self = shift;
22              
23 0           my ($par)= @_;
24 0           $par->{db_action} = "insert";
25            
26 0           $self->db_insert_or_update ($par);
27             }
28              
29             # update -------------------------------------------------------------
30              
31             sub db_update {
32 0     0 0   my $self = shift;
33              
34 0           my ($par)= @_;
35 0           $par->{db_action} = "update";
36            
37 0           $self->db_insert_or_update ($par);
38             }
39              
40             # blob_read ----------------------------------------------------------
41              
42             sub db_blob_read {
43 0     0 0   my $self = shift;
44 0           my ($par) = @_;
45 0           my ($filehandle, $filename, $table, $col, $where, $params) =
46             @$par{'filehandle','filename','table','col','where','params'};
47              
48 0           my $dbh = $self->{dbh};
49              
50             # sind wir im AutoCommit Modus? Blob Handling bei Pg MUSS in
51             # einer Transaktion gemacht werden
52 0           my $autocommit = $dbh->{AutoCommit};
53            
54 0 0         if ( $autocommit ) {
55 0           $dbh->{AutoCommit} = 0;
56 0 0         $self->{debug} && print STDERR "$exc:blob_read: AutoCommit abgeschaltet\n";
57             }
58              
59             # die folgenden Operationen können Exceptions werfen. In jedem
60             # Fall muß aber der Transaktionsmodus wieder hergestellt werden,
61             # deshalb müssen diese in einem eval stehen.
62              
63 0           my $blob;
64 0           eval {
65             # oid des Blobs lesen
66 0           my ($oid) = $self->get (
67             sql => "select $col
68             from $table
69             where $where",
70             params => $params
71             );
72              
73 0 0         $self->{debug} && print STDERR "$exc:blob_read: blob oid $oid\n";
74              
75 0 0 0       croak "no_blob" if not $oid or $oid < 0;
76              
77             # Blob öffnen
78 0           my $lo_fd = $dbh->func($oid, $dbh->{pg_INV_READ}, 'lo_open');
79 0 0         croak "Can't open blob with oid $oid" if not defined $lo_fd;
80              
81 0 0         $self->{debug} && print STDERR "$exc:blob_read: AutoCommit abgeschaltet\n";
82              
83 0           my ($buffer, $len);
84              
85 0 0         if ( $filehandle ) {
    0          
86             # Blob in Filhandle schreiben
87 0           while ( $len = $dbh->func ($lo_fd, $buffer, 4096, 'lo_read' ) ) {
88 0 0         croak "Can't read from blob with oid $oid"
89             if not defined $len;
90 0           write ($filehandle, $buffer, $len);
91             }
92              
93             } elsif ( $filename ) {
94             # Blob in eine Datei schreiben
95 0           my $fh = FileHandle->new;
96 0 0         open ($fh, "> $filename") or croak "Can't write $filename";
97 0           while ( $len = $dbh->func ($lo_fd, $buffer, 4096, 'lo_read' ) ) {
98 0 0         croak "Can't read from blob with oid $oid"
99             if not defined $len;
100 0           write ($fh, $buffer, $len);
101             }
102 0           close $fh;
103              
104             } else {
105             # Blob in den Speicher lesen
106 0           while ( $len = $dbh->func ($lo_fd, $buffer, 4096, 'lo_read' ) ) {
107 0 0         croak "Can't read from blob with oid $oid"
108             if not defined $len;
109 0           $blob .= $buffer;
110             }
111             }
112              
113 0           my $rc = $dbh->func($lo_fd, 'lo_close');
114 0 0         croak "Can't close blob with oid $oid" if not $rc;
115             };
116            
117             # evtl. Exception speichern
118 0           my $error = $@;
119            
120             # Die no_blob Exception ist hier kein Fehlerfall
121 0 0         $error = undef if $@ =~ /^no_blob/;
122              
123             # Hatten wir AutoCommit abgeschaltet und Transaktionsmodus
124             # eingeschaltet? Dann committen wir hier und schalten
125             # AutoCommit wieder ein.
126 0 0         if ( $autocommit ) {
127 0 0         if ( $error ) {
128 0           $dbh->rollback;
129             } else {
130 0           $dbh->commit;
131             }
132 0           $dbh->{AutoCommit} = 1;
133 0 0         $self->{debug} && print STDERR "$exc:blob_read: AutoCommit eingeschaltet\n";
134             }
135              
136             # Exception weiterreichen
137 0 0         croak $error if $error;
138              
139 0 0 0       return if $filehandle or $filename;
140 0           return \$blob;
141             }
142              
143             # left_outer_join ----------------------------------------------------
144             {
145             my $from;
146             my $where;
147              
148             sub db_left_outer_join {
149 0     0 0   my $self = shift;
150            
151             # static Variablen initialisieren
152            
153 0           $from = "";
154 0           $where = "";
155              
156             # Rekursionsmethode anwerfen
157              
158 0           $self->db_left_outer_join_rec ( @_ );
159            
160             # Dreck bereinigen
161              
162 0           $from =~ s/,$//;
163 0           $from =~ s/,\)/)/g;
164 0           $where =~ s/ AND $//;
165              
166 0 0         $where = '1=1' if $where eq '';
167              
168 0           return ($from, $where);
169             }
170              
171             sub db_left_outer_join_rec {
172 0     0 0   my $self = shift;
173              
174 0           my ($lref, $left_table_out) = @_;
175            
176             # linke Tabelle in die FROM Zeile
177              
178 0 0         $from .= " ".$lref->[0]
179             if not $left_table_out;
180            
181 0 0         if ( ref $lref->[1] ) {
182             # aha, Outer Join
183 0 0         if ( @{$lref->[1]} > 1 ) {
  0            
184             # kein einfacher Outer Join
185             # (verschachtelt oder outer join gegen
186             # simple join, Fall II/III)
187              
188 0           $from .= " left outer join ".$lref->[1]->[0].
189             " on ".$lref->[2];
190              
191 0           $self->db_left_outer_join_rec ($lref->[1], 1);
192              
193             } else {
194             # Fall I, outer join einer linken Tabelle
195             # gegen eine oder mehrere rechte Tabellen
196 0           my $i = 1;
197 0           while ($i < @{$lref}) {
  0            
198 0           $from .= " left outer join ".$lref->[$i]->[0].
199             " on ".$lref->[$i+1];
200 0           $i += 2;
201             }
202             }
203             } else {
204             # noe, kein Outer join
205 0           croak "$exc:db_left_outer_join\tcase III does not exist anymore";
206 0           $from .= $lref->[1];
207 0           $where .= $lref->[2]." AND ";
208             }
209             }
210             }
211              
212             # cmpi ---------------------------------------------------------------
213              
214             sub db_cmpi {
215 0     0 0   my $self = shift;
216 0           my ($par)= @_;
217              
218 1     1   1762 use locale;
  1         3  
  1         7  
219              
220 0           return "lower($par->{col}) $par->{op} ".
221             $self->{dbh}->quote (lc($par->{val}));
222             }
223              
224             # use_db -------------------------------------------------------------
225              
226             sub db_use_db {
227 0     0 0   my $self = shift;
228              
229 0           croak "use_db not implemented";
230             }
231              
232             # db_prefix ----------------------------------------------------------
233              
234             sub db_db_prefix {
235 0     0 0   my $self = shift;
236              
237 0           croak "db_prefix not implemented";
238             }
239              
240             # install ------------------------------------------------------------
241              
242             sub db_install {
243             # nichts zu tun hier, für PostgreSQL
244 0     0 0   1;
245             }
246              
247             # contains -----------------------------------------------------------
248              
249             sub db_contains {
250 0     0 0   my $self = shift;
251              
252 0           return;
253             }
254              
255             # get_features -------------------------------------------------------
256              
257             sub db_get_features {
258 0     0 0   my $self = shift;
259            
260             return {
261 0           serial => 1,
262             blob_read => 1,
263             blob_write => 1,
264             left_outer_join => {
265             simple => 1,
266             nested => 1
267             },
268             cmpi => 1,
269             contains => 0,
270             };
271             }
272              
273             # Driverspezifische Hilfsmethoden ====================================
274              
275             # join Bedingung bauen -----------------------------------------------
276              
277             sub db_join_cond {
278 0     0 0   my $self = shift;
279            
280 0           my ($left, $cond, $right, $join) = @_;
281            
282             # beim outer join müssen (+) Zeichen bei Ausdrücken der
283             # rechten Tabelle angehängt werden
284            
285 0           my ($table, $alias) = split (/\s/, $right);
286 0   0       $alias ||= $table;
287            
288 0           $cond =~ s/($alias\.[^\s]+)/$1 (+)/g;
289            
290 0           return $cond;
291             }
292              
293             # Insert bzw. Update durchführen -------------------------------------
294              
295             sub db_insert_or_update {
296 0     0 0   my $self = shift;
297            
298 0           my ($par) = @_;
299 0           my $type_href = $par->{type};
300              
301 0           my $serial; # evtl. Serial Wert
302 0           my (@columns, @values); # Spaltennamen und -werte
303 0           my $return_value; # serial bei insert,
304             # modified bei update
305            
306             # Parameter aufbereiten
307              
308 0           my ($col, $val);
309 0           my $qm; # Fragezeichen für Parameterbinding
310 0           my %blobs; # Hier werden BLOB Spalten abgelegt, die
311             # nach dem INSERT eingefügt werden
312 0           my $blob_found;
313 0           my $primary_key; # Name der primary key Spalte
314            
315 0           while ( ($col,$val) = each %{$par->{data}} ) {
  0            
316 0           my $type = $type_href->{$col};
317 0           $type =~ s/\[.*//;
318              
319 0 0 0       if ( $type eq 'serial' ) {
    0          
320             # serial Typ bearbeiten
321              
322 0 0         if ( not defined $val ) {
323 0           $serial = $self->db_get_serial (
324             $par->{table},
325             $col,
326             $type_href->{$col}
327             );
328             } else {
329 0           $serial = $val;
330             }
331            
332 0           push @columns, $col;
333 0           push @values, $serial;
334 0           $qm .= "?,";
335 0           $primary_key = $col;
336            
337             } elsif ( $type eq 'blob' or $type eq 'clob' ) {
338              
339             # Blobs werden nach dem INSERT/UPDATE verarbeitet
340              
341 0 0         if ( $par->{db_action} eq 'insert' ) {
342 0           push @columns, $col;
343 0           $qm .= "-1, ";
344             }
345              
346 0           $blob_found = 1;
347 0           $blobs{$col} = $val;
348              
349             } else {
350             # alle übrigen Typen werden as is eingefügt
351 0           push @columns, $col;
352 0           push @values, $val;
353 0           $qm .= "?,";
354             }
355             }
356 0           $qm =~ s/,$//; # letztes Komma bügeln
357            
358             # Insert oder Update durchführen
359            
360 0 0         if ( $par->{db_action} eq 'insert' ) {
361             # insert ausführen
362 0           $self->do (
363             sql => "insert into $par->{table} (".
364             join (",",@columns).
365             ") values ($qm)",
366             params => \@values
367             );
368 0           $return_value = $serial;
369             } else {
370             # Parameter der where Klausel in @value pushen
371 0           push @values, @{$par->{params}};
  0            
372            
373             # update ausführen, wenn columns da sind
374             # (bei einem reinen BLOB updated passiert es,
375             # daß keine 'normalen' Spalten upgedated werden)
376            
377 0 0         if ( @columns ) {
378 0           $return_value = $self->do (
379             sql => "update $par->{table} set ".
380             join(",", map("$_=?", @columns)).
381             " where $par->{where}",
382             params => \@values
383             );
384             }
385             }
386              
387             # nun evtl. BLOBs verarbeiten
388            
389 0 0         if ( $blob_found ) {
390 0 0         if ( $par->{db_action} eq 'insert' ) {
391 0           while ( ($col,$val) = each %blobs ) {
392 0           $self->db_put_blob (
393             $par->{table},
394             "$primary_key=$serial",
395             $col, $val,
396             $type_href
397             );
398             }
399             } else {
400 0           while ( ($col,$val) = each %blobs ) {
401 0           $self->db_put_blob (
402             $par->{table},
403             $par->{where},
404             $col, $val,
405             $type_href,
406             $par->{params}
407             );
408             }
409             }
410             }
411              
412 0 0         $self->{debug} && print STDERR "$exc:insert_or_update: return_value=$return_value\n";
413              
414 0           return $return_value;
415             }
416              
417             # Serial ermitteln ---------------------------------------------------
418              
419             sub db_get_serial {
420 0     0 0   my $self = shift;
421            
422 0           my ($table, $col, $type) = @_;
423            
424             # SEQUENCE Namen bestimmen
425            
426 0   0       my $sequence ||= "${table}_SEQ";
427            
428             # Sequence auslesen
429              
430 0           my $serial;
431            
432 0           eval {
433 0           ($serial) = $self->get (
434             sql => "select nextval(?)",
435             cache => 1,
436             params => [ $sequence ],
437             );
438             };
439            
440             # wenn's nicht geklappt hat, gab's die SEQUENCE wohl nicht
441            
442 0 0         if ( $@ ) {
443 0 0         $self->{debug} && print STDERR "$exc:get_serial: sequence existiert nicht\n";
444             # also: legen wir sie doch einfach an!
445            
446 0           my ($max_id) = $self->get (
447             sql => "select max($col) from $table"
448             );
449              
450 0 0         $self->{debug} && print STDERR "$exc:get_serial: max_id=$max_id\n";
451              
452 0           $max_id += 100;
453              
454 0 0         $self->{debug} && print STDERR "$exc:get_serial: create sequence mit start=$max_id\n";
455              
456 0           $self->do (
457             sql => "create sequence $sequence
458             start $max_id
459             increment 1"
460             );
461 0           $serial = $max_id-1;
462             }
463            
464 0           return $serial;
465             }
466              
467             # BLOB speichern -----------------------------------------------------
468              
469             sub db_put_blob {
470 0     0 0   my $self = shift;
471 0           my ($table, $where, $col, $val, $type_href, $param_lref) = @_;
472              
473 0   0       $param_lref ||= [];
474              
475 0           my $dbh = $self->{dbh};
476              
477             # sind wir im AutoCommit Modus? Blob Handling bei Pg MUSS in
478             # einer Transaktion gemacht werden
479 0           my $autocommit = $dbh->{AutoCommit};
480            
481 0 0         if ( $autocommit ) {
482 0           $dbh->{AutoCommit} = 0;
483 0 0         $self->{debug} && print STDERR "$exc:put_blob: AutoCommit abgeschaltet\n";
484             }
485              
486             # die folgenden Operationen können Exceptions werfen. In jedem
487             # Fall muß aber der Transaktionsmodus wieder hergestellt werden,
488             # deshalb müssen diese in einem eval stehen.
489              
490 0           eval {
491             # 1. Prüfen, ob's da schon einen Blob gibt
492 0           my ($oid) = $self->get (
493             sql => "select $col
494             from $table
495             where $where",
496             params => $param_lref,
497             );
498              
499 0 0         $self->{debug} && print STDERR "$exc:put_blob: oid=$oid\n";
500              
501 0 0         if ( $oid != -1 ) {
502             # alten Blob löschen
503 0           my $rc = $dbh->func ($oid, 'lo_unlink');
504 0 0         croak "can't delete old blob oid=$oid" if not $rc;
505 0 0         $self->{debug} && print STDERR "$exc:put_blob: alten blob gelöscht oid=$oid\n";
506             }
507              
508 0           $oid = $dbh->func($dbh->{pg_INV_WRITE}, 'lo_creat');
509 0 0         croak "Can't create blob" if not defined $oid;
510              
511 0           $self->do (
512             sql => "update $table set $col = ? where $where",
513 0           params => [ $oid, @{$param_lref} ]
514             );
515              
516 0 0         $self->{debug} && print STDERR "$exc:put_blob: neuen blob erzeugt. oid=$oid\n";
517              
518             # nun den Blob zum Schreiben öffnen
519 0           my $lo_fd = $dbh->func ($oid, $dbh->{pg_INV_WRITE}, 'lo_open');
520 0 0         croak "Can't open blob for update with oid $oid" if not defined $lo_fd;
521              
522             # und nun schreiben wir den Burschen
523 0 0 0       if ( ref $val and ref $val ne 'SCALAR' ) {
    0          
524             # Referenz und zwar keine Scalarreferenz
525             # => das ist ein Filehandle
526             # => reinlesen den Kram
527 0           binmode $val;
528 0           my ($buffer, $rc);
529 0           while ( read ($val, $buffer, 4096) ) {
530 0           $rc = $dbh->func($lo_fd, $buffer, 4096, 'lo_write');
531 0 0         croak "Can't write data to blob with oid $oid"
532             if not defined $rc;
533             }
534              
535             } elsif ( not ref $val ) {
536             # keine Referenz
537             # => Dateiname
538             # => reinlesen den Kram
539 0           my $fh = new FileHandle;
540 0 0         open ($fh, $val) or croak "can't open file '$val'";
541 0           binmode $fh;
542 0           my ($buffer, $rc, $len);
543 0           while ( $len = read ($fh, $buffer, 4096) ) {
544 0           $rc = $dbh->func($lo_fd, $buffer, $len, 'lo_write');
545 0 0         croak "Can't write data to blob with oid $oid"
546             if not defined $rc;
547             }
548 0           close $fh;
549              
550             } else {
551             # andernfalls ist val eine Skalarreferenz mit dem Blob
552 0           my $len = length($$val);
553 0           my $i = 0;
554 0           my $rc;
555             my $chunk_len;
556 0           while ( $i < $len ) {
557 0           $chunk_len = $i+4096;
558 0 0         if ( $chunk_len > $len ) {
559 0           $chunk_len = $len - $i;
560             } else {
561 0           $chunk_len = 4096;
562             }
563 0           $rc = $dbh->func (
564             $lo_fd,
565             substr($$val, $i, $i + $chunk_len),
566             $chunk_len,
567             'lo_write'
568             );
569 0 0         croak "Can't write data to blob with oid $oid"
570             if not defined $rc;
571 0           $i += 4096;
572             }
573             }
574              
575             # Blob schließen
576 0           my $rc = $dbh->func($lo_fd,'lo_close');
577 0 0         croak "Can't close blob with oid $oid" if not $rc;
578             };
579            
580             # evtl. Exception speichern
581 0           my $error = $@;
582            
583             # Hatten wir AutoCommit abgeschaltet und Transaktionsmodus
584             # eingeschaltet? Dann committen wir hier und schalten
585             # AutoCommit wieder ein.
586 0 0         if ( $autocommit ) {
587 0 0         if ( $error ) {
588 0           $dbh->rollback;
589             } else {
590 0           $dbh->commit;
591             }
592 0           $dbh->{AutoCommit} = 1;
593 0 0         $self->{debug} && print STDERR "$exc:put_blob: AutoCommit eingeschaltet\n";
594             }
595              
596             # Exception weiterreichen
597 0 0         croak $error if $error;
598              
599 0           return 1;
600             }
601              
602             1;
603              
604             __END__