File Coverage

blib/lib/Database/Async/Engine/PostgreSQL.pm
Criterion Covered Total %
statement 115 361 31.8
branch 18 98 18.3
condition 18 94 19.1
subroutine 34 94 36.1
pod 20 38 52.6
total 205 685 29.9


line stmt bran cond sub pod time code
1             package Database::Async::Engine::PostgreSQL;
2             # ABSTRACT: PostgreSQL support for Database::Async
3              
4 5     5   1208787 use strict;
  5         57  
  5         195  
5 5     5   35 use warnings;
  5         12  
  5         140  
6              
7 5     5   1158 use utf8;
  5         38  
  5         31  
8              
9             our $VERSION = '1.001';
10              
11 5     5   296 use parent qw(Database::Async::Engine);
  5         11  
  5         29  
12              
13             =encoding utf8
14              
15             =head1 NAME
16              
17             Database::Async::Engine::PostgreSQL - support for PostgreSQL databases in L
18              
19             =head1 DESCRIPTION
20              
21             Provide a C URI when instantiating L to use this engine.
22              
23             $loop->add(
24             my $dbh = Database::Async->new(
25             uri => 'postgresql://localhost'
26             )
27             );
28              
29             Connection can also be made using a service definition, as described in L.
30              
31             $loop->add(
32             my $dbh = Database::Async->new(
33             type => 'postgresql',
34             engine => {
35             service => 'example',
36             }
37             )
38             );
39              
40             If neither URI nor service are provided, the C environment variable is attempted, and will fall back
41             to localhost (similar to C behaviour).
42              
43             $loop->add(
44             my $dbh = Database::Async->new(
45             type => 'postgresql',
46             )
47             );
48              
49              
50             =cut
51              
52 5     5   52753 no indirect;
  5         2205  
  5         31  
53 5     5   1240 use Syntax::Keyword::Try;
  5         4312  
  5         39  
54 5     5   1384 use Ryu::Async;
  5         300386  
  5         226  
55 5     5   2373 use Ryu::Observable;
  5         9948  
  5         181  
56 5     5   38 use curry;
  5         11  
  5         117  
57 5     5   25 use Scalar::Util ();
  5         13  
  5         98  
58 5     5   2115 use URI::postgres;
  5         9230  
  5         218  
59 5     5   2225 use URI::QueryParam;
  5         4572  
  5         205  
60 5     5   35 use Future::AsyncAwait;
  5         12  
  5         44  
61 5     5   1153 use Database::Async::Query;
  5         8179  
  5         209  
62 5     5   2546 use File::HomeDir;
  5         29355  
  5         400  
63 5     5   2406 use Config::Tiny;
  5         5837  
  5         218  
64 5     5   39 use Encode ();
  5         13  
  5         95  
65 5     5   2239 use MIME::Base64 ();
  5         3679  
  5         177  
66 5     5   2818 use Bytes::Random::Secure ();
  5         47834  
  5         186  
67 5     5   2048 use Unicode::UTF8;
  5         2583  
  5         258  
68 5     5   2267 use Crypt::Digest::SHA256 ();
  5         22462  
  5         154  
69 5     5   2373 use Crypt::Mac::HMAC ();
  5         5873  
  5         264  
70              
71 5     5   2170 use Protocol::Database::PostgreSQL::Client qw(2.000);
  5         159415  
  5         670  
72 5     5   2742 use Protocol::Database::PostgreSQL::Constants qw(:v1);
  5         2306  
  5         852  
73              
74 5     5   40 use Log::Any qw($log);
  5         12  
  5         39  
75              
76             use overload
77 0     0   0 '""' => sub { ref(shift) },
78 0     0   0 bool => sub { 1 },
79 5     5   1401 fallback => 1;
  5         12  
  5         62  
80              
81             Database::Async::Engine->register_class(
82             postgresql => __PACKAGE__
83             );
84              
85             =head1 METHODS
86              
87             =head2 configure
88              
89             =cut
90              
91             sub configure {
92 4     4 1 8781 my ($self, %args) = @_;
93 4         13 for (qw(service encoding application_name)) {
94 12 50       33 $self->{$_} = delete $args{$_} if exists $args{$_};
95             }
96 4         20 return $self->next::method(%args);
97             }
98              
99 0     0 0 0 sub encoding { shift->{encoding} }
100              
101             =head2 connection
102              
103             Returns a L representing the database connection,
104             and will attempt to connect if we are not already connected.
105              
106             =cut
107              
108             sub connection {
109 0     0 1 0 my ($self) = @_;
110 0   0     0 $self->{connection} //= $self->connect;
111             }
112              
113             =head2 ssl
114              
115             Whether to try SSL or not, expected to be one of the following
116             values from L:
117              
118             =over 4
119              
120             =item * C
121              
122             =item * C
123              
124             =item * C
125              
126             =back
127              
128             =cut
129              
130 0     0 1 0 sub ssl { shift->{ssl} }
131              
132             =head2 read_len
133              
134             Buffer read length. Higher values mean we will attempt to read more
135             data for each I/O loop iteration.
136              
137             Defaults to 2 megabytes.
138              
139             =cut
140              
141 0   0 0 1 0 sub read_len { shift->{read_len} //= 2 * 1024 * 1024 }
142              
143             =head2 write_len
144              
145             Buffer write length. Higher values mean we will attempt to write more
146             data for each I/O loop iteration.
147              
148             Defaults to 2 megabytes.
149              
150             =cut
151              
152 0   0 0 1 0 sub write_len { shift->{write_len} //= 2 * 1024 * 1024 }
153              
154             =head2 connect
155              
156             Establish a connection to the server.
157              
158             Returns a L which resolves to the L
159             once ready.
160              
161             =cut
162              
163 0     0 1 0 async sub connect {
164 0         0 my ($self) = @_;
165 0         0 my $loop = $self->loop;
166              
167 0         0 my $connected = $self->connected;
168 0 0       0 die 'We think we are already connected, and that is bad' if $connected->as_numeric;
169              
170             # Initial connection is made directly through the URI
171             # parameters. Eventually we also want to support UNIX
172             # socket and other types.
173 0 0 0     0 $self->{uri} ||= $self->uri_for_service($self->service) if $self->service;
174 0         0 my $uri = $self->uri;
175 0 0       0 die 'bad URI' unless ref $uri;
176 0         0 $log->tracef('URI for connection is %s', "$uri");
177 0         0 my $endpoint = join ':', $uri->host, $uri->port;
178              
179 0         0 $log->tracef('Will connect to %s', $endpoint);
180 0         0 $self->{ssl} = do {
181 0   0     0 my $mode = $uri->query_param('sslmode') // 'prefer';
182 0   0     0 $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode;
183             };
184              
185             # We're assuming TCP (either v4 or v6) here, but there's not really any reason we couldn't have
186             # UNIX sockets or other transport layers here other than lack of demand so far.
187 0         0 my $sock = await $loop->connect(
188             service => $uri->port,
189             host => $uri->host,
190             socktype => 'stream',
191             );
192              
193 0         0 my $local = join ':', $sock->sockhost_service(1);
194 0         0 my $remote = join ':', $sock->peerhost_service(1);
195 0         0 $log->tracef('Connected to %s as %s from %s', $endpoint, $remote, $local);
196              
197             # We start with a null handler for read, because our behaviour varies depending on
198             # whether we want to go through the SSL dance or not.
199             $self->add_child(
200             my $stream = IO::Async::Stream->new(
201             handle => $sock,
202 0     0   0 on_read => sub { 0 }
203             )
204 0         0 );
205              
206             # SSL is conveniently simple: a prefix exchange before the real session starts,
207             # and the user can decide whether SSL is mandatory or optional.
208 0         0 $stream = await $self->negotiate_ssl(
209             stream => $stream,
210             );
211              
212 0         0 Scalar::Util::weaken($self->{stream} = $stream);
213             $self->outgoing->each(sub {
214 0     0   0 $log->tracef('Write bytes [%v02x]', $_);
215 0         0 $self->ready_for_query->set_string('');
216 0         0 $self->stream->write("$_");
217 0         0 return;
218 0         0 });
219 0         0 $stream->configure(
220             on_read => $self->curry::weak::on_read,
221             read_len => $self->read_len,
222             write_len => $self->write_len,
223             autoflush => 0,
224             );
225              
226 0         0 $log->tracef('Send initial request with user %s', $uri->user);
227              
228             # This is where the extensible options for initial connection are applied:
229             # we have already handled SSL by this point, so we exclude this from the
230             # list and pass everything else directly to the startup packet.
231 0         0 my %qp = $uri->query_params;
232 0         0 delete $qp{sslmode};
233              
234 0   0     0 $qp{application_name} //= $self->application_name;
235 0         0 $self->protocol->send_startup_request(
236             database => $self->database_name,
237             user => $self->database_user,
238             %qp
239             );
240 0         0 $connected->set_numeric(1);
241 0         0 return $stream;
242             }
243              
244             =head2 service_conf_path
245              
246             Return the expected location for the pg_service.conf file.
247              
248             =cut
249              
250             sub service_conf_path {
251 8     8 1 7645 my ($class) = @_;
252 8 100       45 return $ENV{PGSERVICEFILE} if exists $ENV{PGSERVICEFILE};
253 4 100       23 return $ENV{PGSYSCONFDIR} . '/pg_service.conf' if exists $ENV{PGSYSCONFDIR};
254 2         15 my $path = File::HomeDir->my_home . '/.pg_service.conf';
255 2 100       56 return $path if -r $path;
256 1         9 return '/etc/pg_service.conf';
257             }
258              
259             sub service_parse {
260 3     3 0 8 my ($class, $path) = @_;
261 3         23 return Config::Tiny->read($path, 'encoding(UTF-8)');
262             }
263              
264             sub find_service {
265 3     3 0 5858 my ($class, $srv) = @_;
266 3         12 my $data = $class->service_parse(
267             $class->service_conf_path
268             );
269 3 100       2250 die 'service ' . $srv . ' not found in config' unless $data->{$srv};
270 2         22 return $data->{$srv};
271             }
272              
273 0   0 0 0 0 sub service { shift->{service} //= $ENV{PGSERVICE} }
274              
275             sub database_name {
276 8     8 0 314 my $uri = shift->uri;
277 8   33     53 return $uri->dbname // $uri->user // 'postgres'
      50        
278             }
279              
280             sub database_user {
281 8     8 0 357 my $uri = shift->uri;
282 8   50     62 return $uri->user // 'postgres'
283             }
284              
285             sub password_from_file {
286 18     18 0 827 my $self = shift;
287 18   66     115 my $pwfile = $ENV{PGPASSFILE} || File::HomeDir->my_home . '/.pgpass';
288              
289 18 50       116 unless ($^O eq 'MSWin32') { # same as libpq
290             # libpq also does stat here instead of lstat. So, pgpass can be
291             # a logical link.
292 18 50       260 my (undef, undef, $mode) = stat $pwfile or return undef;
293 18 50       77 unless (-f _) {
294 0         0 $log->warnf("WARNING: password file \"%s\" is not a plain file\n", $pwfile);
295 0         0 return undef;
296             }
297              
298 18 100       54 if ($mode & 077) {
299 1         12 $log->warnf("WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less", $pwfile);
300 1         118 return undef;
301             }
302             # libpq has the same race condition of stat versus open.
303             }
304              
305             # It's not an error for this file to be missing: it might not
306             # be readable for various reasons, but for now we ignore that case as well
307             # (we've already checked for overly-lax permissions above)
308 17 50       560 open my $fh, '<', $pwfile or return undef;
309              
310 17         369 while (defined(my $line = readline $fh)) {
311 17 50       77 next if $line =~ '^#';
312 17         37 chomp $line;
313 17 50       272 my ($host, $port, $db, $user, $pw) = ($line =~ /((?:\\.|[^:])*)(?::|$)/g)
314             or next;
315 17         89 s/\\(.)/$1/g for ($host, $port, $db, $user, $pw);
316              
317 17 50 66     178 return $pw if (
      66        
      33        
      66        
      33        
      66        
      33        
318             $host eq '*' || $host eq $self->uri->host and
319             $port eq '*' || $port eq $self->uri->port and
320             $user eq '*' || $user eq $self->database_user and
321             $db eq '*' || $db eq $self->database_name
322             );
323             }
324              
325 0         0 return undef;
326             }
327              
328             sub database_password {
329 20     20 0 30529 my $self = shift;
330 20   66     75 return $self->uri->password // $ENV{PGPASSWORD} || $self->password_from_file
331             }
332              
333             =head2 negotiate_ssl
334              
335             Apply SSL negotiation.
336              
337             =cut
338              
339 0     0 1   async sub negotiate_ssl {
340 0           my ($self, %args) = @_;
341 0           my $stream = delete $args{stream};
342              
343             # If SSL is disabled entirely, just return the same stream as-is
344 0 0         my $ssl = $self->ssl
345             or return $stream;
346              
347 0           require IO::Async::SSL;
348 0           require IO::Socket::SSL;
349              
350 0           $log->tracef('Attempting to negotiate SSL');
351 0           await $stream->write($self->protocol->ssl_request);
352              
353 0           $log->tracef('Waiting for response');
354 0           my ($resp, $eof) = await $stream->read_exactly(1);
355              
356 0 0         $log->tracef('Read %v02x from server for SSL response (EOF is %s)', $resp, $eof ? 'true' : 'false');
357 0 0         die 'Server closed connection' if $eof;
358              
359 0 0         if($resp eq 'S') {
    0          
360             # S for SSL...
361 0           $log->tracef('This is SSL, let us upgrade');
362             $stream = await $self->loop->SSL_upgrade(
363             handle => $stream,
364             # SSL defaults...
365             SSL_server => 0,
366             SSL_hostname => $self->uri->host,
367             SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE(),
368             # Pass through anything SSL-related unchanged, the user knows
369             # better than we do
370 0           (map {; $_ => $self->{$_} } grep { /^SSL_/ } keys %$self)
371             );
372 0           $log->tracef('Upgrade complete');
373             } elsif($resp eq 'N') {
374             # N for "no SSL"...
375 0           $log->tracef('No to SSL');
376 0 0         die 'Server does not support SSL' if $self->ssl == SSL_REQUIRE;
377             } else {
378             # anything else is unexpected
379 0           die 'Unknown response to SSL request';
380             }
381 0           return $stream;
382             }
383              
384 0   0 0 0   sub is_replication { shift->{is_replication} //= 0 }
385 0   0 0 0   sub application_name { shift->{application_name} //= 'perl' }
386              
387             =head2 uri_for_dsn
388              
389             Returns a L corresponding to the given L.
390              
391             May throw an exception if we don't have a valid string.
392              
393             =cut
394              
395             sub uri_for_dsn {
396 0     0 1   my ($class, $dsn) = @_;
397 0 0         die 'invalid DSN, expecting DBI:Pg:...' unless $dsn =~ s/^DBI:Pg://i;
398 0           my %args = split /[=;]/, $dsn;
399 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
400 0           $uri->$_(delete $args{$_}) for grep exists $args{$_}, qw(host port user password dbname);
401 0           $uri
402             }
403              
404             sub uri_for_service {
405 0     0 0   my ($class, $service) = @_;
406 0           my $cfg = $class->find_service($service);
407              
408             # Start with common default values (i.e. follow libpq behaviour unless there's a strong reason not to)
409 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
410              
411             # Standard fields supported by URI::pg
412 0           $uri->$_(delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(host port user password dbname);
413             # ... note that `hostaddr` takes precedence over plain `host`
414 0 0         $uri->host(delete $cfg->{hostaddr}) if exists $cfg->{hostaddr};
415              
416             # Everything else is handled via query parameters, this list is non-exhaustive and likely to be
417             # extended in future (e.g. text/binary protocol mode)
418 0           $uri->query_param($_ => delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(
419             application_name
420             fallback_application_name
421             keepalives
422             options
423             sslmode
424             replication
425             );
426 0           $uri
427             }
428              
429             =head2 stream
430              
431             The L representing the database connection.
432              
433             =cut
434              
435 0     0 1   sub stream { shift->{stream} }
436              
437             =head2 on_read
438              
439             Process incoming database packets.
440              
441             Expects the following parameters:
442              
443             =over 4
444              
445             =item * C<$stream> - the L we are receiving data on
446              
447             =item * C<$buffref> - a scalar reference to the current input data buffer
448              
449             =item * C<$eof> - true if we have reached the end of input
450              
451             =back
452              
453             =cut
454              
455             sub on_read {
456 0     0 1   my ($self, $stream, $buffref, $eof) = @_;
457              
458             try {
459             $log->tracef('Have server message of length %d', length $$buffref);
460             while(my $msg = $self->protocol->extract_message($buffref)) {
461             $log->tracef('Message: %s', $msg);
462             $self->incoming->emit($msg);
463             }
464             } catch($e) {
465             # This really shouldn't happen, but since we can't trust our current state we should drop
466             # the connection ASAP, and avoid any chance of barrelling through to a COMMIT or other
467             # risky operation.
468             $log->errorf('Failed to handle read, connection is no longer in a valid state: %s', $e);
469             $self->close_now;
470             } finally {
471             $self->connected->set_numeric(0) if $eof;
472             }
473 0           return 0;
  0            
474             }
475              
476             =head2 ryu
477              
478             Provides a L instance.
479              
480             =cut
481              
482             sub ryu {
483 0     0 1   my ($self) = @_;
484 0   0       $self->{ryu} //= do {
485 0           $self->add_child(
486             my $ryu = Ryu::Async->new
487             );
488 0           $ryu
489             }
490             }
491              
492             =head2 outgoing
493              
494             L representing outgoing packets for the current database connection.
495              
496             =cut
497              
498             sub outgoing {
499 0     0 1   my ($self) = @_;
500 0   0       $self->{outgoing} //= $self->ryu->source;
501             }
502              
503             =head2 incoming
504              
505             L representing incoming packets for the current database connection.
506              
507             =cut
508              
509             sub incoming {
510 0     0 1   my ($self) = @_;
511 0   0       $self->{incoming} //= $self->ryu->source;
512             }
513              
514             =head2 connected
515              
516             A L which will be 1 while the connection is in a valid state,
517             and 0 if we're disconnected.
518              
519             =cut
520              
521             sub connected {
522 0     0 1   my ($self) = @_;
523 0   0       $self->{connected} //= do {
524 0           my $obs = Ryu::Observable->new(0);
525             $obs->subscribe(
526             $self->$curry::weak(sub {
527 0     0     my ($self, $v) = @_;
528             # We only care about disconnection events
529 0 0         return if $v;
530              
531             # If we were doing something, then it didn't work
532 0 0         if(my $query = delete $self->{active_query}) {
533 0 0         $query->completed->fail('disconnected') unless $query->completed->is_ready;
534             }
535              
536             # Tell the database pool management that we're no longer useful
537 0 0         if(my $db = $self->db) {
538 0           $db->engine_disconnected($self);
539             }
540             })
541 0           );
542 0           $obs
543             };
544             }
545              
546             =head2 authenticated
547              
548             Resolves once database authentication is complete.
549              
550             =cut
551              
552             sub authenticated {
553 0     0 1   my ($self) = @_;
554 0   0       $self->{authenticated} //= $self->loop->new_future;
555             }
556              
557             # Handlers for authentication messages from backend.
558             our %AUTH_HANDLER = (
559             AuthenticationOk => sub {
560             my ($self, $msg) = @_;
561             $self->authenticated->done;
562             },
563             AuthenticationKerberosV5 => sub {
564             my ($self, $msg) = @_;
565             die "Not yet implemented";
566             },
567             AuthenticationCleartextPassword => sub {
568             my ($self, $msg) = @_;
569             $self->protocol->send_message(
570             'PasswordMessage',
571             user => $self->encode_text($self->uri->user),
572             password_type => 'plain',
573             password => $self->encode_text($self->database_password),
574             );
575             },
576             AuthenticationMD5Password => sub {
577             my ($self, $msg) = @_;
578             $self->protocol->send_message(
579             'PasswordMessage',
580             user => $self->encode_text($self->uri->user),
581             password_type => 'md5',
582             password_salt => $msg->password_salt,
583             password => $self->encode_text($self->database_password),
584             );
585             },
586             AuthenticationSCMCredential => sub {
587             my ($self, $msg) = @_;
588             die "Not yet implemented";
589             },
590             AuthenticationGSS => sub {
591             my ($self, $msg) = @_;
592             die "Not yet implemented";
593             },
594             AuthenticationSSPI => sub {
595             my ($self, $msg) = @_;
596             die "Not yet implemented";
597             },
598             AuthenticationGSSContinue => sub {
599             my ($self, $msg) = @_;
600             die "Not yet implemented";
601             },
602             AuthenticationSASL => sub {
603             my ($self, $msg) = @_;
604             $log->tracef('SASL starts');
605             my $nonce = MIME::Base64::encode_base64(Bytes::Random::Secure::random_string_from(join('', ('a'..'z'), ('A'..'Z'), ('0'..'9')), 18), '');
606             $self->{client_first_message} = 'n,,n=,r=' . $nonce;
607             $self->protocol->send_message(
608             'SASLInitialResponse',
609             mechanism => 'SCRAM-SHA-256',
610             nonce => $nonce,
611             );
612             },
613             AuthenticationSASLContinue => sub {
614             my ($self, $msg) = @_;
615             $log->tracef('Have msg %s', $msg);
616              
617             my $rounds = $msg->password_rounds or die 'need iteration count';
618              
619             my $server_first_message = $msg->server_first_message;
620             my $pass = Unicode::UTF8::encode_utf8($self->database_password);
621             my $salted_password = do {
622             my $hash = Crypt::Mac::HMAC::hmac('SHA256', $pass, MIME::Base64::decode_base64($msg->password_salt), pack('N1', 1));
623             my $out = $hash;
624             # Skip the first round - that's our original $hash value - and recursively re-hash
625             # for the remainder, incrementally building our bitwise XOR result
626             for my $idx (1..$rounds-1) {
627             $hash = Crypt::Mac::HMAC::hmac('SHA256', $pass, $hash);
628             $out = "$out" ^ "$hash";
629             }
630             $out
631             };
632             # The client key uses the literal string 'Client Key' as the base - for the server, it'd be 'Server Key'
633             my $client_key = Crypt::Mac::HMAC::hmac('SHA256', $salted_password, "Client Key");
634             my $server_key = Crypt::Mac::HMAC::hmac('SHA256', $salted_password, "Server Key");
635             # Then we hash this to get the stored key, which will be used for the signature
636             my $stored_key = Crypt::Digest::SHA256::sha256($client_key);
637              
638             my $client_first_message = $self->{client_first_message};
639             # Strip out the channel-binding GS2 header
640             my $header = 'n,,';
641             $client_first_message =~ s{^\Q$header}{};
642             # ... but we _do_ want the header in the final-message c= GS2 component
643             my $client_final_message = 'c=' . MIME::Base64::encode_base64($header, '') . ',r=' . $msg->password_nonce;
644              
645             # this is what we want to sign!
646             my $auth_message = join ',', $client_first_message, $server_first_message, $client_final_message;
647             $log->tracef('Auth message = %s', $auth_message);
648              
649             my $client_signature = Crypt::Mac::HMAC::hmac('SHA256', $stored_key, $auth_message);
650             my $client_proof = "$client_key" ^ "$client_signature";
651             $log->tracef('Client proof is %s', $client_proof);
652             my $server_signature = Crypt::Mac::HMAC::hmac('SHA256', $server_key, $auth_message);
653             $self->{expected_server_signature} = $server_signature;
654             $self->protocol->send_message(
655             'SASLResponse',
656             header => $header,
657             nonce => $msg->password_nonce,
658             proof => $client_proof,
659             );
660             },
661             AuthenticationSASLFinal => sub {
662             my ($self, $msg) = @_;
663             my $expected = MIME::Base64::encode_base64($self->{expected_server_signature}, '');
664             die 'invalid server signature ' . $msg->server_signature . ', expected ' . $expected unless $msg->server_signature eq $expected;
665             $log->tracef('Server signature seems fine, continue with auth');
666             # No further action required, we'll get an AuthenticationOk immediately after this
667             }
668             );
669              
670             =head2 protocol
671              
672             Returns the L instance, creating it
673             and setting up event handlers if necessary.
674              
675             =cut
676              
677             sub protocol {
678 0     0 1   my ($self) = @_;
679 0   0       $self->{protocol} //= do {
680 0           my $pg = Protocol::Database::PostgreSQL::Client->new(
681             database => $self->database_name,
682             outgoing => $self->outgoing,
683             );
684             $self->incoming
685             ->switch_str(
686 0     0     sub { $_->type },
687             authentication_request => $self->$curry::weak(sub {
688 0     0     my ($self, $msg) = @_;
689 0           $log->tracef('Auth request received: %s', $msg);
690 0 0         my $code = $AUTH_HANDLER{$msg->auth_type}
691             or $log->errorf('unknown auth type %s', $msg->auth_type);
692 0           $self->$code($msg);
693             }),
694             password => $self->$curry::weak(sub {
695 0     0     my ($self, %args) = @_;
696 0           $log->tracef('Auth request received: %s', \%args);
697 0           $self->protocol->{user} = $self->uri->user;
698 0           $self->protocol->send_message('PasswordMessage', password => $self->encode_text($self->database_password));
699             }),
700             parameter_status => $self->$curry::weak(sub {
701 0     0     my ($self, $msg) = @_;
702 0           $log->tracef('Parameter received: %s', $msg);
703 0           $self->set_parameter(map $self->decode_text($_), $msg->key => $msg->value);
704             }),
705             row_description => $self->$curry::weak(sub {
706 0     0     my ($self, $msg) = @_;
707 0           $log->tracef('Row description %s', $msg);
708 0 0         $log->errorf('No active query?') unless my $q = $self->active_query;
709 0           $q->row_description($msg->description);
710             }),
711             data_row => $self->$curry::weak(sub {
712 0     0     my ($self, $msg) = @_;
713 0           $log->tracef('Have row data %s', $msg);
714             $self->{fc} ||= $self->active_query->row_data->flow_control->each($self->$curry::weak(sub {
715 0           my ($self) = @_;
716 0 0         $log->tracef('Flow control event - will %s stream', $_ ? 'resume' : 'pause');
717 0 0         $self->stream->want_readready($_) if $self->stream;
718 0   0       }));
719 0           $self->active_query->row([ map $self->decode_text($_), $msg->fields ]);
720             }),
721             command_complete => $self->$curry::weak(sub {
722 0     0     my ($self, $msg) = @_;
723 0           delete $self->{fc};
724 0 0         my $query = delete $self->{active_query} or do {
725 0           $log->warnf('Command complete but no query');
726 0           return;
727             };
728 0           $log->tracef('Completed query %s with result %s', $query, $msg->result);
729 0 0         $query->done unless $query->completed->is_ready;
730             }),
731             no_data => $self->$curry::weak(sub {
732 0     0     my ($self, $msg) = @_;
733 0           $log->tracef('Completed query %s with no data', $self->active_query);
734             # my $query = delete $self->{active_query};
735             # $query->done if $query;
736             }),
737             send_request => $self->$curry::weak(sub {
738 0     0     my ($self, $msg) = @_;
739 0           $log->tracef('Send request for %s', $msg);
740 0           $self->stream->write($msg);
741             }),
742             ready_for_query => $self->$curry::weak(sub {
743 0     0     my ($self, $msg) = @_;
744 0           $log->tracef('Ready for query, state is %s', $msg->state);
745 0           $self->ready_for_query->set_string($msg->state);
746 0 0         $self->db->engine_ready($self) if $self->db;
747             }),
748             backend_key_data => $self->$curry::weak(sub {
749 0     0     my ($self, $msg) = @_;
750 0           $log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
751             }),
752             parse_complete => $self->$curry::weak(sub {
753 0     0     my ($self, $msg) = @_;
754 0           $log->tracef('Parsing complete for query %s', $self->active_query);
755             }),
756             bind_complete => $self->$curry::weak(sub {
757 0     0     my ($self, $msg) = @_;
758 0           $log->tracef('Bind complete for query %s', $self->active_query);
759             }),
760             close_complete => $self->$curry::weak(sub {
761 0     0     my ($self, $msg) = @_;
762 0           delete $self->{fc};
763 0           $log->tracef('Close complete for query %s', $self->active_query);
764             }),
765             empty_query_response => $self->$curry::weak(sub {
766 0     0     my ($self, $msg) = @_;
767 0           $log->tracef('Query returned no results for %s', $self->active_query);
768             }),
769             error_response => $self->$curry::weak(sub {
770 0     0     my ($self, $msg) = @_;
771 0 0         if(my $query = $self->active_query) {
772 0           $log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
773 0           my $f = $query->completed;
774 0 0         $f->fail($msg->error) unless $f->is_ready;
775             } else {
776 0           $log->errorf('Received error %s with no active query', $msg->error);
777             }
778             }),
779             copy_in_response => $self->$curry::weak(sub {
780 0     0     my ($self, $msg) = @_;
781 0           my $query = $self->active_query;
782 0           $log->tracef('Ready to copy data for %s', $query);
783 0           my $proto = $self->protocol;
784             {
785 0           my $src = $query->streaming_input;
  0            
786             $src->completed
787             ->on_ready(sub {
788 0           my ($f) = @_;
789 0           $log->tracef('Sending copy done notification, stream status was %s', $f->state);
790 0           $proto->send_message(
791             'CopyDone',
792             data => '',
793             );
794 0           $proto->send_message(
795             'Close',
796             portal => '',
797             statement => '',
798             );
799 0           $proto->send_message(
800             'Sync',
801             portal => '',
802             statement => '',
803             );
804 0           });
805             $src->each(sub {
806 0           $log->tracef('Sending %s', $_);
807 0           $proto->send_copy_data($_);
808 0           });
809             }
810 0 0         $query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
811             }),
812             copy_out_response => $self->$curry::weak(sub {
813 0     0     my ($self, $msg) = @_;
814 0           $log->tracef('copy out starts %s', $msg);
815             # $self->active_query->row([ $msg->fields ]);
816             }),
817             copy_data => $self->$curry::weak(sub {
818 0     0     my ($self, $msg) = @_;
819 0           $log->tracef('Have copy data %s', $msg);
820 0 0         my $query = $self->active_query or do {
821 0           $log->warnf('No active query for copy data');
822 0           return;
823             };
824 0           $query->row([ map $self->decode_text($_), @$_ ]) for $msg->rows;
825             }),
826             copy_done => $self->$curry::weak(sub {
827 0     0     my ($self, $msg) = @_;
828 0           $log->tracef('Copy done - %s', $msg);
829             }),
830             notification_response => $self->$curry::weak(sub {
831 0     0     my ($self, $msg) = @_;
832 0           my ($chan, $data) = @{$msg}{qw(channel data)};
  0            
833 0           $log->tracef('Notification on channel %s containing %s', $chan, $data);
834 0           $self->db->notification($self, map $self->decode_text($_), $chan, $data);
835             }),
836 0     0     sub { $log->errorf('Unknown message %s (type %s)', $_, $_->type) }
837 0           );
838 0           $pg
839             }
840             }
841              
842             sub stream_from {
843 0     0 0   my ($self, $src) = @_;
844 0           my $proto = $self->proto;
845             $src->each(sub {
846 0     0     $log->tracef('Sending %s', $_);
847             # This is already UTF-8 encoded in the protocol handler,
848             # since it's a text-based protocol
849 0           $proto->send_copy_data($_);
850             })
851 0           }
852              
853             =head2 set_parameter
854              
855             Marks a parameter update from the server.
856              
857             =cut
858              
859             sub set_parameter {
860 0     0 1   my ($self, $k, $v) = @_;
861 0 0         if(my $param = $self->{parameter}{$k}) {
862 0           $param->set_string($v);
863             } else {
864 0           $self->{parameter}{$k} = Ryu::Observable->new($v);
865             }
866 0           $self
867             }
868              
869             =head2 idle
870              
871             Resolves when we are idle and ready to process the next request.
872              
873             =cut
874              
875             sub idle {
876 0     0 1   my ($self) = @_;
877             $self->{idle} //= $self->loop->new_future->on_ready(sub {
878             delete $self->{idle}
879 0   0 0     });
  0            
880             }
881              
882             sub ready_for_query {
883 0     0 0   my ($self) = @_;
884 0   0       $self->{ready_for_query} //= do {
885             Ryu::Observable->new(0)->subscribe($self->$curry::weak(sub {
886 0     0     my ($self, $v) = @_;
887 0 0 0       return unless my $idle = $self->{idle} and $v;
888 0 0         $idle->done unless $idle->is_ready;
889 0           }))
890             }
891             }
892              
893             sub simple_query {
894 0     0 0   my ($self, $sql) = @_;
895 0 0         die 'already have active query' if $self->{active_query};
896 0           $self->{active_query} = my $query = Database::Async::Query->new(
897             sql => $sql,
898             row_data => my $src = $self->ryu->source
899             );
900 0           $self->protocol->simple_query($self->encode_text($query->sql));
901 0           return $src;
902             }
903              
904             sub encode_text {
905 0     0 0   my ($self, $txt) = @_;
906 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
907 0 0         return Unicode::UTF8::encode_utf8($txt) if $encoding eq 'UTF-8';
908 0           return Encode::encode($encoding, $txt, Encode::FB_CROAK);
909             }
910              
911             sub decode_text {
912 0     0 0   my ($self, $txt) = @_;
913 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
914 0 0         return Unicode::UTF8::decode_utf8($txt) if $encoding eq 'UTF-8';
915 0           return Encode::decode($encoding, $txt, Encode::FB_CROAK);
916             }
917              
918             sub handle_query {
919 0     0 0   my ($self, $query) = @_;
920 0 0         die 'already have active query' if $self->{active_query};
921 0           $self->{active_query} = $query;
922 0           my $proto = $self->protocol;
923 0           $proto->send_message(
924             'Parse',
925             sql => $self->encode_text($query->sql),
926             statement => '',
927             );
928 0           $proto->send_message(
929             'Bind',
930             portal => '',
931             statement => '',
932             param => [ map $self->encode_text($_), $query->bind ],
933             );
934 0           $proto->send_message(
935             'Describe',
936             portal => '',
937             statement => '',
938             );
939 0           $proto->send_message(
940             'Execute',
941             portal => '',
942             statement => '',
943             );
944 0 0         unless($query->{in}) {
945 0           $proto->send_message(
946             'Close',
947             portal => '',
948             statement => '',
949             );
950 0           $proto->send_message(
951             'Sync',
952             portal => '',
953             statement => '',
954             );
955             }
956             Future->done
957 0           }
958              
959 0     0 1   sub query { die 'use handle_query instead'; }
960              
961 0     0 0   sub active_query { shift->{active_query} }
962              
963             1;
964              
965             __END__