File Coverage

blib/lib/Database/Async/Engine/PostgreSQL.pm
Criterion Covered Total %
statement 72 305 23.6
branch 8 66 12.1
condition 0 61 0.0
subroutine 23 84 27.3
pod 19 35 54.2
total 122 551 22.1


line stmt bran cond sub pod time code
1             package Database::Async::Engine::PostgreSQL;
2             # ABSTRACT: PostgreSQL support for Database::Async
3              
4 3     3   824597 use strict;
  3         58  
  3         111  
5 3     3   27 use warnings;
  3         8  
  3         157  
6              
7             our $VERSION = '0.011';
8              
9 3     3   18 use parent qw(Database::Async::Engine);
  3         6  
  3         20  
10              
11             =head1 NAME
12              
13             Database::Async::Engine::PostgreSQL - support for PostgreSQL databases in L
14              
15             =head1 DESCRIPTION
16              
17             Provide a C URI when instantiating L to use this engine.
18              
19             $loop->add(
20             my $dbh = Database::Async->new(
21             uri => 'postgresql://localhost'
22             )
23             );
24              
25             Connection can also be made using a service definition, as described in L.
26              
27             $loop->add(
28             my $dbh = Database::Async->new(
29             type => 'postgresql',
30             engine => {
31             service => 'example',
32             }
33             )
34             );
35              
36             If neither URI nor service are provided, the C environment variable is attempted, and will fall back
37             to localhost (similar to C behaviour).
38              
39             $loop->add(
40             my $dbh = Database::Async->new(
41             type => 'postgresql',
42             )
43             );
44              
45              
46             =cut
47              
48 3     3   25260 no indirect;
  3         1093  
  3         26  
49 3     3   666 use Ryu::Async;
  3         154478  
  3         106  
50 3     3   1551 use Ryu::Observable;
  3         4316  
  3         108  
51 3     3   23 use curry;
  3         7  
  3         68  
52 3     3   15 use Scalar::Util ();
  3         6  
  3         56  
53 3     3   1508 use URI::postgres;
  3         5274  
  3         135  
54 3     3   1474 use URI::QueryParam;
  3         2771  
  3         116  
55 3     3   683 use Future::AsyncAwait;
  3         2222  
  3         31  
56 3     3   693 use Database::Async::Query;
  3         3710  
  3         110  
57 3     3   1769 use File::HomeDir;
  3         19373  
  3         219  
58 3     3   1653 use Config::Tiny;
  3         3481  
  3         110  
59 3     3   24 use Encode ();
  3         6  
  3         83  
60 3     3   1493 use Unicode::UTF8;
  3         1738  
  3         453  
61              
62 3     3   1641 use Protocol::Database::PostgreSQL::Client qw(0.008);
  3         102550  
  3         232  
63 3     3   1960 use Protocol::Database::PostgreSQL::Constants qw(:v1);
  3         1663  
  3         500  
64              
65 3     3   25 use Log::Any qw($log);
  3         7  
  3         28  
66              
67             use overload
68 0     0   0 '""' => sub { ref(shift) },
69 0     0   0 bool => sub { 1 },
70 3     3   938 fallback => 1;
  3         9  
  3         40  
71              
72             Database::Async::Engine->register_class(
73             postgresql => __PACKAGE__
74             );
75              
76             =head1 METHODS
77              
78             =head2 configure
79              
80             =cut
81              
82             sub configure {
83 0     0 1 0 my ($self, %args) = @_;
84 0         0 for (qw(service encoding)) {
85 0 0       0 $self->{$_} = delete $args{$_} if exists $args{$_};
86             }
87 0         0 return $self->next::method(%args);
88             }
89              
90 0     0 0 0 sub encoding { shift->{encoding} }
91              
92             =head2 connection
93              
94             Returns a L representing the database connection,
95             and will attempt to connect if we are not already connected.
96              
97             =cut
98              
99             sub connection {
100 0     0 1 0 my ($self) = @_;
101 0   0     0 $self->{connection} //= $self->connect;
102             }
103              
104             =head2 ssl
105              
106             Whether to try SSL or not, expected to be one of the following
107             values from L:
108              
109             =over 4
110              
111             =item * C
112              
113             =item * C
114              
115             =item * C
116              
117             =back
118              
119             =cut
120              
121 0     0 1 0 sub ssl { shift->{ssl} }
122              
123             =head2 read_len
124              
125             Buffer read length. Higher values mean we will attempt to read more
126             data for each I/O loop iteration.
127              
128             Defaults to 2 megabytes.
129              
130             =cut
131              
132 0   0 0 1 0 sub read_len { shift->{read_len} //= 2 * 1024 * 1024 }
133              
134             =head2 write_len
135              
136             Buffer write length. Higher values mean we will attempt to write more
137             data for each I/O loop iteration.
138              
139             Defaults to 2 megabytes.
140              
141             =cut
142              
143 0   0 0 1 0 sub write_len { shift->{write_len} //= 2 * 1024 * 1024 }
144              
145             =head2 connect
146              
147             Establish a connection to the server.
148              
149             Returns a L which resolves to the L
150             once ready.
151              
152             =cut
153              
154 0     0 1 0 async sub connect {
155 0         0 my ($self) = @_;
156 0         0 my $loop = $self->loop;
157              
158             # Initial connection is made directly through the URI
159             # parameters. Eventually we also want to support UNIX
160             # socket and other types.
161 0 0 0     0 $self->{uri} ||= $self->uri_for_service($self->service) if $self->service;
162 0         0 my $uri = $self->uri;
163 0 0       0 die 'bad URI' unless ref $uri;
164 0         0 $log->tracef('URI for connection is %s', "$uri");
165 0         0 my $endpoint = join ':', $uri->host, $uri->port;
166 0         0 $log->tracef('Will connect to %s', $endpoint);
167 0         0 $self->{ssl} = do {
168 0   0     0 my $mode = $uri->query_param('sslmode') // 'prefer';
169 0   0     0 $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode;
170             };
171              
172 0         0 my $sock = await $loop->connect(
173             service => $uri->port,
174             host => $uri->host,
175             socktype => 'stream',
176             );
177              
178 0         0 my $local = join ':', $sock->sockhost_service(1);
179 0         0 my $remote = join ':', $sock->peerhost_service(1);
180 0         0 $log->tracef('Connected to %s as %s from %s', $endpoint, $remote, $local);
181              
182             # We start with a null handler for read, because our behaviour varies depending on
183             # whether we want to go through the SSL dance or not.
184             $self->add_child(
185             my $stream = IO::Async::Stream->new(
186             handle => $sock,
187 0     0   0 on_read => sub { 0 }
188             )
189 0         0 );
190              
191             # SSL is conveniently simple: a prefix exchange before the real session starts,
192             # and the user can decide whether SSL is mandatory or optional.
193 0         0 $stream = await $self->negotiate_ssl(
194             stream => $stream,
195             );
196              
197 0         0 Scalar::Util::weaken($self->{stream} = $stream);
198             $self->outgoing->each(sub {
199 0     0   0 $log->tracef('Write bytes [%v02x]', $_);
200 0         0 $self->ready_for_query->set_string('');
201 0         0 $self->stream->write("$_");
202 0         0 return;
203 0         0 });
204 0         0 $stream->configure(
205             on_read => $self->curry::weak::on_read,
206             read_len => $self->read_len,
207             write_len => $self->write_len,
208             autoflush => 0,
209             );
210              
211 0         0 $log->tracef('Send initial request with user %s', $uri->user);
212 0         0 my %qp = $uri->query_params;
213 0         0 delete $qp{sslmode};
214 0   0     0 $qp{application_name} //= $self->application_name;
215 0         0 $self->protocol->send_startup_request(
216             database => $self->database_name,
217             user => $self->database_user,
218             %qp
219             );
220 0         0 return $stream;
221             }
222              
223             =head2 service_conf_path
224              
225             Return the expected location for the pg_service.conf file.
226              
227             =cut
228              
229             sub service_conf_path {
230 8     8 1 3202 my ($class) = @_;
231 8 100       37 return $ENV{PGSERVICEFILE} if exists $ENV{PGSERVICEFILE};
232 4 100       23 return $ENV{PGSYSCONFDIR} . '/pg_service.conf' if exists $ENV{PGSYSCONFDIR};
233 2         10 my $path = File::HomeDir->my_home . '/.pg_service.conf';
234 2 100       55 return $path if -r $path;
235 1         9 return '/etc/pg_service.conf';
236             }
237              
238             sub service_parse {
239 3     3 0 8 my ($class, $path) = @_;
240 3         17 return Config::Tiny->read($path, 'encoding(UTF-8)');
241             }
242              
243             sub find_service {
244 3     3 0 4951 my ($class, $srv) = @_;
245 3         10 my $data = $class->service_parse(
246             $class->service_conf_path
247             );
248 3 100       1969 die 'service ' . $srv . ' not found in config' unless $data->{$srv};
249 2         19 return $data->{$srv};
250             }
251              
252 0   0 0 0   sub service { shift->{service} //= $ENV{PGSERVICE} }
253              
254             sub database_name {
255 0     0 0   my $uri = shift->uri;
256 0   0       return $uri->dbname // $uri->user // 'postgres'
      0        
257             }
258              
259             sub database_user {
260 0     0 0   my $uri = shift->uri;
261 0   0       return $uri->user // 'postgres'
262             }
263              
264             =head2 negotiate_ssl
265              
266             Apply SSL negotiation.
267              
268             =cut
269              
270 0     0 1   async sub negotiate_ssl {
271 0           my ($self, %args) = @_;
272 0           my $stream = delete $args{stream};
273              
274             # If SSL is disabled entirely, just return the same stream as-is
275 0 0         my $ssl = $self->ssl
276             or return $stream;
277              
278 0           require IO::Async::SSL;
279 0           require IO::Socket::SSL;
280              
281 0           $log->tracef('Attempting to negotiate SSL');
282 0           await $stream->write($self->protocol->ssl_request);
283              
284 0           $log->tracef('Waiting for response');
285 0           my ($resp, $eof) = await $stream->read_exactly(1);
286              
287 0 0         $log->tracef('Read %v02x from server for SSL response (EOF is %s)', $resp, $eof ? 'true' : 'false');
288 0 0         die 'Server closed connection' if $eof;
289              
290 0 0         if($resp eq 'S') {
    0          
291             # S for SSL...
292 0           $log->tracef('This is SSL, let us upgrade');
293             $stream = await $self->loop->SSL_upgrade(
294             handle => $stream,
295             # SSL defaults...
296             SSL_server => 0,
297             SSL_hostname => $self->uri->host,
298             SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE(),
299             # Pass through anything SSL-related unchanged, the user knows
300             # better than we do
301 0           (map {; $_ => $self->{$_} } grep { /^SSL_/ } keys %$self)
302             );
303 0           $log->tracef('Upgrade complete');
304             } elsif($resp eq 'N') {
305             # N for "no SSL"...
306 0           $log->tracef('No to SSL');
307 0 0         die 'Server does not support SSL' if $self->ssl == SSL_REQUIRE;
308             } else {
309             # anything else is unexpected
310 0           die 'Unknown response to SSL request';
311             }
312 0           return $stream;
313             }
314              
315 0   0 0 0   sub is_replication { shift->{is_replication} //= 0 }
316 0   0 0 0   sub application_name { shift->{application_name} //= 'perl' }
317              
318             =head2 uri_for_dsn
319              
320             Returns a L corresponding to the given L.
321              
322             May throw an exception if we don't have a valid string.
323              
324             =cut
325              
326             sub uri_for_dsn {
327 0     0 1   my ($class, $dsn) = @_;
328 0 0         die 'invalid DSN, expecting DBI:Pg:...' unless $dsn =~ s/^DBI:Pg://i;
329 0           my %args = split /[=;]/, $dsn;
330 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
331 0           $uri->$_(delete $args{$_}) for grep exists $args{$_}, qw(host port user password dbname);
332 0           $uri
333             }
334              
335             sub uri_for_service {
336 0     0 0   my ($class, $service) = @_;
337 0           my $cfg = $class->find_service($service);
338 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
339 0           $uri->$_(delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(host port user password dbname);
340 0 0         $uri->host(delete $cfg->{hostaddr}) if exists $cfg->{hostaddr};
341 0           $uri->query_param($_ => delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(
342             application_name
343             fallback_application_name
344             keepalives
345             options
346             sslmode
347             replication
348             );
349 0           $uri
350             }
351              
352             =head2 stream
353              
354             The L representing the database connection.
355              
356             =cut
357              
358 0     0 1   sub stream { shift->{stream} }
359              
360             =head2 on_read
361              
362             Process incoming database packets.
363              
364             Expects the following parameters:
365              
366             =over 4
367              
368             =item * C<$stream> - the L we are receiving data on
369              
370             =item * C<$buffref> - a scalar reference to the current input data buffer
371              
372             =item * C<$eof> - true if we have reached the end of input
373              
374             =back
375              
376             =cut
377              
378             sub on_read {
379 0     0 1   my ($self, $stream, $buffref, $eof) = @_;
380              
381 0           $log->tracef('Have server message of length %d', length $$buffref);
382 0           while(my $msg = $self->protocol->extract_message($buffref)) {
383 0           $log->tracef('Message: %s', $msg);
384 0           $self->incoming->emit($msg);
385             }
386 0           return 0;
387             }
388              
389             =head2 ryu
390              
391             Provides a L instance.
392              
393             =cut
394              
395             sub ryu {
396 0     0 1   my ($self) = @_;
397 0   0       $self->{ryu} //= do {
398 0           $self->add_child(
399             my $ryu = Ryu::Async->new
400             );
401 0           $ryu
402             }
403             }
404              
405             =head2 outgoing
406              
407             L representing outgoing packets for the current database connection.
408              
409             =cut
410              
411             sub outgoing {
412 0     0 1   my ($self) = @_;
413 0   0       $self->{outgoing} //= $self->ryu->source;
414             }
415              
416             =head2 incoming
417              
418             L representing incoming packets for the current database connection.
419              
420             =cut
421              
422             sub incoming {
423 0     0 1   my ($self) = @_;
424 0   0       $self->{incoming} //= $self->ryu->source;
425             }
426              
427             =head2 authenticated
428              
429             Resolves once database authentication is complete.
430              
431             =cut
432              
433             sub authenticated {
434 0     0 1   my ($self) = @_;
435 0   0       $self->{authenticated} //= $self->loop->new_future;
436             }
437              
438             # Handlers for authentication messages from backend.
439             our %AUTH_HANDLER = (
440             AuthenticationOk => sub {
441             my ($self, $msg) = @_;
442             $self->authenticated->done;
443             },
444             AuthenticationKerberosV5 => sub {
445             my ($self, $msg) = @_;
446             die "Not yet implemented";
447             },
448             AuthenticationCleartextPassword => sub {
449             my ($self, $msg) = @_;
450             $self->protocol->send_message(
451             'PasswordMessage',
452             user => $self->encode_text($self->uri->user),
453             password_type => 'plain',
454             password => $self->encode_text($self->uri->password),
455             );
456             },
457             AuthenticationMD5Password => sub {
458             my ($self, $msg) = @_;
459             $self->protocol->send_message(
460             'PasswordMessage',
461             user => $self->encode_text($self->uri->user),
462             password_type => 'md5',
463             password_salt => $msg->password_salt,
464             password => $self->encode_text($self->uri->password),
465             );
466             },
467             AuthenticationSCMCredential => sub {
468             my ($self, $msg) = @_;
469             die "Not yet implemented";
470             },
471             AuthenticationGSS => sub {
472             my ($self, $msg) = @_;
473             die "Not yet implemented";
474             },
475             AuthenticationSSPI => sub {
476             my ($self, $msg) = @_;
477             die "Not yet implemented";
478             },
479             AuthenticationGSSContinue => sub {
480             my ($self, $msg) = @_;
481             die "Not yet implemented";
482             }
483             );
484              
485             =head2 protocol
486              
487             Returns the L instance, creating it
488             and setting up event handlers if necessary.
489              
490             =cut
491              
492             sub protocol {
493 0     0 1   my ($self) = @_;
494 0   0       $self->{protocol} //= do {
495 0           my $pg = Protocol::Database::PostgreSQL::Client->new(
496             database => $self->database_name,
497             outgoing => $self->outgoing,
498             );
499             $self->incoming
500             ->switch_str(
501 0     0     sub { $_->type },
502             authentication_request => $self->$curry::weak(sub {
503 0     0     my ($self, $msg) = @_;
504 0           $log->tracef('Auth request received: %s', $msg);
505 0 0         my $code = $AUTH_HANDLER{$msg->auth_type}
506             or $log->errorf('unknown auth type %s', $msg->auth_type);
507 0           $self->$code($msg);
508             }),
509             password => $self->$curry::weak(sub {
510 0     0     my ($self, %args) = @_;
511 0           $log->tracef('Auth request received: %s', \%args);
512 0           $self->protocol->{user} = $self->uri->user;
513 0           $self->protocol->send_message('PasswordMessage', password => $self->encode_text($self->uri->password));
514             }),
515             parameter_status => $self->$curry::weak(sub {
516 0     0     my ($self, $msg) = @_;
517 0           $log->tracef('Parameter received: %s', $msg);
518 0           $self->set_parameter(map $self->decode_text($_), $msg->key => $msg->value);
519             }),
520             row_description => $self->$curry::weak(sub {
521 0     0     my ($self, $msg) = @_;
522 0           $log->tracef('Row description %s', $msg);
523 0 0         $log->errorf('No active query?') unless my $q = $self->active_query;
524 0           $q->row_description($msg->description);
525             }),
526             data_row => $self->$curry::weak(sub {
527 0     0     my ($self, $msg) = @_;
528 0           $log->tracef('Have row data %s', $msg);
529 0           $self->active_query->row([ map $self->decode_text($_), $msg->fields ]);
530             }),
531             command_complete => $self->$curry::weak(sub {
532 0     0     my ($self, $msg) = @_;
533 0 0         my $query = delete $self->{active_query} or do {
534 0           $log->warnf('Command complete but no query');
535 0           return;
536             };
537 0           $log->tracef('Completed query %s with result %s', $query, $msg->result);
538 0 0         $query->done unless $query->completed->is_ready;
539             }),
540             no_data => $self->$curry::weak(sub {
541 0     0     my ($self, $msg) = @_;
542 0           $log->tracef('Completed query %s with no data', $self->active_query);
543             # my $query = delete $self->{active_query};
544             # $query->done if $query;
545             }),
546             send_request => $self->$curry::weak(sub {
547 0     0     my ($self, $msg) = @_;
548 0           $log->tracef('Send request for %s', $msg);
549 0           $self->stream->write($msg);
550             }),
551             ready_for_query => $self->$curry::weak(sub {
552 0     0     my ($self, $msg) = @_;
553 0           $log->tracef('Ready for query, state is %s', $msg->state);
554 0           $self->ready_for_query->set_string($msg->state);
555 0 0         $self->db->engine_ready($self) if $self->db;
556             }),
557             backend_key_data => $self->$curry::weak(sub {
558 0     0     my ($self, $msg) = @_;
559 0           $log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
560             }),
561             parse_complete => $self->$curry::weak(sub {
562 0     0     my ($self, $msg) = @_;
563 0           $log->tracef('Parsing complete for query %s', $self->active_query);
564             }),
565             bind_complete => $self->$curry::weak(sub {
566 0     0     my ($self, $msg) = @_;
567 0           $log->tracef('Bind complete for query %s', $self->active_query);
568             }),
569             close_complete => $self->$curry::weak(sub {
570 0     0     my ($self, $msg) = @_;
571 0           $log->tracef('Close complete for query %s', $self->active_query);
572             }),
573             empty_query_response => $self->$curry::weak(sub {
574 0     0     my ($self, $msg) = @_;
575 0           $log->tracef('Query returned no results for %s', $self->active_query);
576             }),
577             error_response => $self->$curry::weak(sub {
578 0     0     my ($self, $msg) = @_;
579 0           my $query = $self->active_query;
580 0           $log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
581 0           my $f = $query->completed;
582 0 0         $f->fail($msg->error) unless $f->is_ready;
583             }),
584             copy_in_response => $self->$curry::weak(sub {
585 0     0     my ($self, $msg) = @_;
586 0           my $query = $self->active_query;
587 0           $log->tracef('Ready to copy data for %s', $query);
588 0           my $proto = $self->protocol;
589             {
590 0           my $src = $query->streaming_input;
  0            
591             $src->completed
592             ->on_ready(sub {
593 0           my ($f) = @_;
594 0           $log->tracef('Sending copy done notification, stream status was %s', $f->state);
595 0           $proto->send_message(
596             'CopyDone',
597             data => '',
598             );
599 0           $proto->send_message(
600             'Close',
601             portal => '',
602             statement => '',
603             );
604 0           $proto->send_message(
605             'Sync',
606             portal => '',
607             statement => '',
608             );
609 0           });
610             $src->each(sub {
611 0           $log->tracef('Sending %s', $_);
612 0           $proto->send_copy_data($_);
613 0           });
614             }
615 0 0         $query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
616             }),
617             copy_out_response => $self->$curry::weak(sub {
618 0     0     my ($self, $msg) = @_;
619 0           $log->tracef('copy out starts %s', $msg);
620             # $self->active_query->row([ $msg->fields ]);
621             }),
622             copy_data => $self->$curry::weak(sub {
623 0     0     my ($self, $msg) = @_;
624 0           $log->tracef('Have copy data %s', $msg);
625 0 0         my $query = $self->active_query or do {
626 0           $log->warnf('No active query for copy data');
627 0           return;
628             };
629 0           $query->row([ map $self->decode_text($_), @$_ ]) for $msg->rows;
630             }),
631             copy_done => $self->$curry::weak(sub {
632 0     0     my ($self, $msg) = @_;
633 0           $log->tracef('Copy done - %s', $msg);
634             }),
635             notification_response => $self->$curry::weak(sub {
636 0     0     my ($self, $msg) = @_;
637 0           my ($chan, $data) = @{$msg}{qw(channel data)};
  0            
638 0           $log->tracef('Notification on channel %s containing %s', $chan, $data);
639 0           $self->db->notification($self, map $self->decode_text($_), $chan, $data);
640             }),
641 0     0     sub { $log->errorf('Unknown message %s (type %s)', $_, $_->type) }
642 0           );
643 0           $pg
644             }
645             }
646              
647             sub stream_from {
648 0     0 0   my ($self, $src) = @_;
649 0           my $proto = $self->proto;
650             $src->each(sub {
651 0     0     $log->tracef('Sending %s', $_);
652             # This is already UTF-8 encoded in the protocol handler,
653             # since it's a text-based protocol
654 0           $proto->send_copy_data($_);
655             })
656 0           }
657              
658             =head2 set_parameter
659              
660             Marks a parameter update from the server.
661              
662             =cut
663              
664             sub set_parameter {
665 0     0 1   my ($self, $k, $v) = @_;
666 0 0         if(my $param = $self->{parameter}{$k}) {
667 0           $param->set_string($v);
668             } else {
669 0           $self->{parameter}{$k} = Ryu::Observable->new($v);
670             }
671 0           $self
672             }
673              
674             =head2 idle
675              
676             Resolves when we are idle and ready to process the next request.
677              
678             =cut
679              
680             sub idle {
681 0     0 1   my ($self) = @_;
682             $self->{idle} //= $self->loop->new_future->on_ready(sub {
683             delete $self->{idle}
684 0   0 0     });
  0            
685             }
686              
687             sub ready_for_query {
688 0     0 0   my ($self) = @_;
689 0   0       $self->{ready_for_query} //= do {
690             Ryu::Observable->new(0)->subscribe($self->$curry::weak(sub {
691 0     0     my ($self, $v) = @_;
692 0 0 0       return unless my $idle = $self->{idle} and $v;
693 0 0         $idle->done unless $idle->is_ready;
694 0           }))
695             }
696             }
697              
698             sub simple_query {
699 0     0 0   my ($self, $sql) = @_;
700 0 0         die 'already have active query' if $self->{active_query};
701 0           $self->{active_query} = my $query = Database::Async::Query->new(
702             sql => $sql,
703             row_data => my $src = $self->ryu->source
704             );
705 0           $self->protocol->simple_query($self->encode_text($query->sql));
706 0           return $src;
707             }
708              
709             sub encode_text {
710 0     0 0   my ($self, $txt) = @_;
711 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
712 0 0         return Unicode::UTF8::encode_utf8($txt) if $encoding eq 'UTF-8';
713 0           return Encode::encode($encoding, $txt, Encode::FB_CROAK);
714             }
715              
716             sub decode_text {
717 0     0 0   my ($self, $txt) = @_;
718 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
719 0 0         return Unicode::UTF8::decode_utf8($txt) if $encoding eq 'UTF-8';
720 0           return Encode::decode($encoding, $txt, Encode::FB_CROAK);
721             }
722              
723             sub handle_query {
724 0     0 0   my ($self, $query) = @_;
725 0 0         die 'already have active query' if $self->{active_query};
726 0           $self->{active_query} = $query;
727 0           my $proto = $self->protocol;
728 0           $proto->send_message(
729             'Parse',
730             sql => $self->encode_text($query->sql),
731             statement => '',
732             );
733 0           $proto->send_message(
734             'Bind',
735             portal => '',
736             statement => '',
737             param => [ map $self->encode_text($_), $query->bind ],
738             );
739 0           $proto->send_message(
740             'Describe',
741             portal => '',
742             statement => '',
743             );
744 0           $proto->send_message(
745             'Execute',
746             portal => '',
747             statement => '',
748             );
749 0 0         unless($query->{in}) {
750 0           $proto->send_message(
751             'Close',
752             portal => '',
753             statement => '',
754             );
755 0           $proto->send_message(
756             'Sync',
757             portal => '',
758             statement => '',
759             );
760             }
761             Future->done
762 0           }
763              
764 0     0 1   sub query { die 'use handle_query instead'; }
765              
766 0     0 0   sub active_query { shift->{active_query} }
767              
768             1;
769              
770             __END__