File Coverage

blib/lib/DBIx/PgLink/Adapter/Pg.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package DBIx::PgLink::Adapter::Pg;
2              
3 4     4   113082 use Moose;
  0            
  0            
4             use Data::Dumper;
5             use Memoize;
6              
7             extends 'DBIx::PgLink::Adapter';
8              
9              
10             has '+are_transactions_supported' => (default=>1);
11             has '+are_routines_supported' => (default=>1);
12             has '+routine_can_be_overloaded' => (default=>1);
13             has '+require_parameter_type' => (default=>0);
14              
15              
16             override 'quote' => sub {
17             my $self = shift;
18             my $q = super();
19             if ($self->dbh->{pg_server_version} >= 80100 && defined $q && $q =~ /\\/) {
20             return 'E' . $q; # work with any 'standard_conforming_strings' settings
21             }
22             return $q;
23             };
24              
25              
26             around qw/
27             selectrow_array selectrow_arrayref selectrow_hashref
28             selectall_arrayref selectall_hashref selectcol_arrayref
29             prepare prepare_cached
30             / => sub {
31             my $next = shift;
32             my $self = shift;
33             my $query = shift;
34             # DBD::Pg/libpq core dumps when execute empty query ('')
35             # although prepare returns valid DBI::st
36             # do() is immune, ' ' query is ok
37             $query = ' ' if $query eq '';
38             $next->($self, $query, @_);
39             };
40              
41              
42             sub is_transaction_active {
43             my $self = shift;
44             return $self->ping == 3; # Database is idle within a transaction (DBD::Pg extension)
45             };
46              
47              
48             # for Reconnect role
49             sub is_disconnected {
50             my ($self, $exception) = @_;
51             return
52             # WARNING: this doesn't work with localized libpq messages
53             $exception =~ /server closed the connection unexpectedly/i
54             || $exception =~ /terminating connection/
55             || $exception =~ /no connection to the server/
56             # SQLSTATE code
57             || $self->dbh->state =~ /^.8...$/; # Class 08 - Connection Exception (first char can be 'S' or '0')
58             }
59              
60              
61             has 'pg_column_type_id_sth' => (
62             is => 'ro',
63             isa => 'Object',
64             lazy => 1,
65             default => sub {
66             my $self = shift;
67             return $self->prepare(<<'END_OF_SQL');
68             SELECT
69             a.atttypid as pg_type_id
70             FROM
71             pg_catalog.pg_class c
72             JOIN pg_catalog.pg_namespace ns on ns.oid = c.relnamespace
73             JOIN pg_catalog.pg_attribute as a on a.attrelid = c.oid
74             WHERE ns.nspname = ?
75             and c.relname = ?
76             and a.attname = ?
77             END_OF_SQL
78             },
79             );
80              
81             has 'pg_type_sth' => (
82             is => 'ro',
83             isa => 'Object',
84             lazy => 1,
85             default => sub {
86             my $self = shift;
87             return $self->prepare(<<'END_OF_SQL');
88             SELECT
89             t.oid,
90             t.typname,
91             t.typtype,
92             t.typrelid,
93             t.typelem,
94             t.typbasetype,
95             t.typnotnull,
96             ns.nspname as type_schema,
97             pg_catalog.format_type(t.oid, NULL) as native_type_name
98             FROM pg_catalog.pg_type t
99             JOIN pg_catalog.pg_namespace ns ON ns.oid = t.typnamespace
100             WHERE t.oid = ?
101             END_OF_SQL
102             },
103             );
104              
105             sub pg_type_by_id {
106             my ($self, $type_id) = @_;
107             $self->pg_type_sth->execute($type_id);
108             return $self->pg_type_sth->fetchrow_hashref;
109             }
110              
111              
112             sub pg_base_type {
113             my ($self, $type_id) = @_;
114              
115             my $t = $self->pg_type_by_id($type_id);
116              
117             my $r = $t->{native_type_name};
118              
119             if ($t->{typtype} eq 'c') {
120             $r = 'TEXT'; # coerce composite type to text
121             } elsif ($t->{typtype} eq 'd') { # domain can be built on base type or another domain
122             $r = $self->pg_base_type($t->{typbasetype});
123             }
124              
125             return $r;
126             }
127              
128             memoize 'pg_base_type';
129              
130              
131             sub current_database {
132             my $self = shift;
133             return $self->selectrow_array('SELECT current_database()');
134             }
135              
136             memoize 'current_database'; # cannot change without disconnect
137              
138             around 'expand_table_info' => sub {
139             my ($next, $self, $info) = @_;
140              
141             # bug: quoted identifier
142             $info->{$_} = $self->unquote_identifier( $info->{$_} )
143             for qw/TABLE_NAME TABLE_SCHEM/;
144              
145             $info->{TABLE_CAT} = $self->current_database;
146              
147             $next->($self, $info);
148             };
149              
150             around 'expand_column_info' => sub {
151             my ($next, $self, $info) = @_;
152              
153             $next->($self, $info) or return 0;
154              
155             # bug: quoted identifier
156             $info->{$_} = $self->unquote_identifier( $info->{$_} )
157             for qw/TABLE_NAME TABLE_SCHEM COLUMN_NAME/;
158              
159             # bug in DBD::Pg 1.49
160             # for numeric column returns 'n,m' in COLUMN_SIZE and undef in DECIMAL_DIGITS
161             if ($info->{TYPE_NAME} =~ /numeric|decimal/i
162             && $info->{COLUMN_SIZE} =~ /\d+,\d+/) {
163             my ($m,$n) = $info->{pg_type} =~ /\((\d+),(\d+)\)/;
164             $info->{COLUMN_SIZE} = $m;
165             $info->{DECIMAL_DIGITS} = $n;
166             }
167              
168             if (!exists $info->{pg_type_id}) {
169             # get column data type id
170             my $sth = $self->pg_column_type_id_sth;
171             $sth->execute(
172             $info->{TABLE_SCHEM},
173             $info->{TABLE_NAME},
174             $info->{COLUMN_NAME},
175             );
176             $info->{pg_type_id} = $sth->fetchrow_array;
177             $sth->finish;
178             }
179              
180             # type name can be domain or composite type
181             $info->{native_type_name} = $info->{pg_type};
182              
183             # always base type
184             $info->{base_type_name} = $self->pg_base_type( $info->{pg_type_id} );
185              
186             # bug: invalid COLUMN_SIZE for bit type
187             if ($info->{base_type_name} eq 'bit') {
188             if ($info->{native_type_name} =~ /^bit\((\d+)\)$/) {
189             $info->{COLUMN_SIZE} = $1;
190             } else {
191             $info->{COLUMN_SIZE} += 4;
192             }
193             }
194              
195             # bytea is the only type that need outout conversion in PL/Perl
196             if ($info->{base_type_name} eq 'bytea') {
197             $info->{conv_to_local} = 'to_pg_bytea';
198             }
199              
200              
201             1;
202             };
203              
204              
205             around 'expand_primary_key_info' => sub {
206             my ($next, $self, $table) = @_;
207              
208             # bug: quoted identifier
209             $table->{$_} = $self->unquote_identifier( $table->{$_} )
210             for qw/TABLE_NAME TABLE_SCHEM COLUMN_NAME PK_NAME/;
211              
212             $next->($self, $table);
213             };
214              
215              
216             override 'routine_info' => sub {
217             my ($self, $catalog, $schema, $routine, $type) = @_;
218              
219             # catalog ignored, type can be only 'FUNCTION'
220             $type =~ s/'//g;
221             return unless grep { $_ eq 'FUNCTION' } split /,/, $type;
222              
223             # include only 'in' and 'inout' arguments to call signature
224             # 'out' arguments goes to column_info
225             my $sth = $self->prepare_cached(<<'END_OF_SQL');
226             SELECT
227             current_database() as "SPECIFIC_CATALOG",
228             n.nspname as "SPECIFIC_SCHEMA",
229             pg_catalog.quote_ident(p.proname)
230             || '('
231             || coalesce(pg_catalog.array_to_string(ARRAY(
232             SELECT
233             p.proargnames[i+1] || ' '
234             || pg_catalog.format_type(p.proargtypes[s.i], null)
235             FROM pg_catalog.generate_series(0, pg_catalog.array_upper(p.proargtypes, 1)) AS s(i)
236             ), ', '), '')
237             || ')' as "SPECIFIC_NAME",
238             current_database() as "ROUTINE_CATALOG",
239             n.nspname as "ROUTINE_SCHEMA",
240             p.proname as "ROUTINE_NAME",
241             'FUNCTION' as "ROUTINE_TYPE",
242             case
243             when p.proretset then 'TABLE'
244             when t.typname = 'record' then 'TABLE'
245             when t.typtype = 'c' then 'TABLE'
246             when t.typelem <> 0 then 'ARRAY'
247             when t.typtype = 'b' then pg_catalog.format_type(t.oid, NULL)
248             else 'USER-DEFINED'
249             end as "DATA_TYPE",
250             'GENERAL'::text as "PARAMETER_STYLE",
251             case p.provolatile
252             when 'i' then 'YES'
253             else 'NO'
254             end as "IS_DETERMINISTIC",
255             case
256             when p.proisstrict then 'YES'
257             else 'NO'
258             end as "IS_NULL_CALL",
259             ------------------
260             p.oid as pg_routine_id,
261             p.proretset as pg_return_set,
262             p.prorettype as pg_return_type_id,
263             p.provolatile as pg_volatile
264             FROM pg_catalog.pg_proc p
265             JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
266             LEFT JOIN pg_catalog.pg_type t ON t.oid = p.prorettype
267             WHERE p.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
268             AND (p.proargtypes[0] IS NULL
269             OR p.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
270             AND NOT p.proisagg
271             AND n.nspname like ?
272             AND p.proname like ?
273             ORDER BY 1, 2, 3;
274             END_OF_SQL
275              
276             $sth->execute($schema, $routine);
277             return $sth;
278             };
279              
280              
281             override 'routine_argument_info_arrayref' => sub {
282             my ($self, $routine_info) = @_;
283              
284             my @result = ();
285              
286             my $sth = $self->prepare_cached(<<'END_OF_SQL');
287             SELECT
288             p.proargnames[i+1] as "COLUMN_NAME",
289             s.i + 1 as "ORDINAL_POSITION",
290             pg_catalog.format_type(p.proargtypes[s.i], null) as native_type_name,
291             p.proargtypes[s.i] as pg_type_id
292             FROM pg_catalog.pg_proc p
293             -- can't pass join column to function
294             CROSS JOIN pg_catalog.generate_series(0, (
295             SELECT pg_catalog.array_upper(proargtypes, 1)
296             FROM pg_catalog.pg_proc
297             WHERE oid = ?
298             )
299             ) as s(i)
300             WHERE p.oid = ?
301             ORDER BY s.i
302             END_OF_SQL
303              
304             $sth->execute(
305             $routine_info->{pg_routine_id},
306             $routine_info->{pg_routine_id},
307             );
308              
309             while (my $c = $sth->fetchrow_hashref) {
310             $c->{base_type_name} = $self->pg_base_type($c->{pg_type_id});
311             push @result, $c;
312             }
313              
314             return \@result;
315              
316             };
317              
318              
319             around 'routine_column_info_arrayref' => sub {
320             my ($next, $self, $info) = @_;
321              
322             my @result = ();
323              
324             my $t = $self->pg_type_by_id( $info->{pg_return_type_id} );
325              
326             if ($t->{typtype} eq 'c') {
327             # composite type
328              
329             # ...based on table/view
330             # DBD::Pg->column_info has restriction (relkind in ('r','v'))
331             @result = @{ $self->column_info_arrayref('%', $t->{type_schema}, $t->{typname}, '%') };
332              
333             # composite type (relkind='c')
334             unless (@result) {
335             my $column_info = $self->selectall_arrayref(<<'END_OF_SQL', {Slice=>{}}, $t->{typrelid});
336             SELECT
337             pg_catalog.current_database() as "TABLE_CAT",
338             ns.nspname as "TABLE_SCHEM",
339             t.relname as "TABLE_NAME",
340             a.attname as "COLUMN_NAME",
341             null as "DATA_TYPE",
342             pg_catalog.format_type(a.atttypid, a.atttypmod) as "TYPE_NAME",
343             null as "COLUMN_SIZE",
344             null as "BUFFER_LENGTH",
345             null as "DECIMAL_DIGITS",
346             null as "NUM_PREC_RADIX",
347             case when a.attnotnull then 'NO' else 'YES' end as "NULLABLE",
348             null as "REMARKS",
349             null as "COLUMN_DEF",
350             null as "SQL_DATA_TYPE",
351             null as "SQL_DATETIME_SUB",
352             null as "CHAR_OCTET_LENGTH",
353             a.attnum as "ORDINAL_POSITION",
354             case when a.attnotnull then 'NO' else 'YES' end as "IS_NULLABLE",
355             --
356             a.atttypid as pg_type_id,
357             pg_catalog.format_type(a.atttypid, a.atttypmod) as native_type_name
358             FROM
359             pg_catalog.pg_class t
360             JOIN pg_catalog.pg_namespace ns ON ns.oid = t.relnamespace
361             JOIN pg_catalog.pg_attribute a ON a.attrelid = t.oid
362             WHERE t.oid = ?
363             AND a.attnum > 0 AND NOT a.attisdropped
364             END_OF_SQL
365             # don't need SQL type
366              
367             for my $ci (@{$column_info}) {
368             $self->expand_column_info($ci)
369             and push @result, $ci;
370             }
371             }
372              
373             } elsif ($t->{typname} eq 'record') {
374             # returns in+inout arguments
375             my $sth = $self->prepare_cached(<<'END_OF_SQL');
376             SELECT
377             p.proargnames[i] as "COLUMN_NAME",
378             pg_catalog.format_type(p.proallargtypes[s.i], null) as native_type_name,
379             p.proallargtypes[s.i] as pg_type_id
380             FROM pg_catalog.pg_proc p
381             -- can't pass join column to function
382             CROSS JOIN pg_catalog.generate_series(0, (
383             SELECT pg_catalog.array_upper(proallargtypes, 1)
384             FROM pg_catalog.pg_proc
385             WHERE oid = ?
386             )
387             ) as s(i)
388             WHERE p.oid = ?
389             AND p.proargmodes[i] in ('o','b')
390             ORDER BY s.i
391             END_OF_SQL
392              
393             $sth->execute(
394             $info->{pg_routine_id},
395             $info->{pg_routine_id},
396             );
397              
398             my $index = 1;
399             while (my $c = $sth->fetchrow_hashref) {
400             $c->{base_type_name} = $self->pg_base_type($c->{pg_type_id});
401             $c->{TYPE_NAME} = $c->{base_type_name};
402             $c->{NULLABLE} = 'YES';
403             $c->{ORDINAL_POSITION} = $index++;
404             push @result, $c;
405             }
406              
407             } else {
408             # base, domain or pseudo type
409             push @result, {
410             COLUMN_NAME => 'RESULT',
411             TYPE_NAME => $t->{native_type_name},
412             NULLABLE => 'YES',
413             ORDINAL_POSITION => 1,
414             pg_type_id => $t->{oid},
415             native_type_name => $t->{native_type_name},
416             base_type_name => $self->pg_base_type($t->{oid}),
417             };
418             }
419             return \@result;
420             };
421              
422              
423             override 'get_number_of_rows' => sub {
424             my ($self, $catalog, $schema, $object, $type) = @_;
425              
426             if ($type eq 'TABLE') {
427             # estimated row count, updated by VACUUM
428             return $self->selectrow_array(<<'END_OF_SQL', {}, $schema, $object);
429             SELECT reltules
430             FROM
431             pg_catalog.pg_class t
432             JOIN pg_catalog.pg_namespace ns ON ns.oid = t.relnamespace
433             WHERE ns.nspname = ?
434             and c.relname = ?
435             and c.relkind = 'r'
436             END_OF_SQL
437             } else {
438             return super();
439             }
440             };
441              
442              
443             1;