File Coverage

blib/lib/Database/Async/Engine/PostgreSQL.pm
Criterion Covered Total %
statement 115 394 29.1
branch 18 132 13.6
condition 18 100 18.0
subroutine 34 95 35.7
pod 20 38 52.6
total 205 759 27.0


line stmt bran cond sub pod time code
1             package Database::Async::Engine::PostgreSQL;
2             # ABSTRACT: PostgreSQL support for Database::Async
3              
4 5     5   1241532 use strict;
  5         49  
  5         144  
5 5     5   27 use warnings;
  5         10  
  5         120  
6              
7 5     5   1032 use utf8;
  5         33  
  5         29  
8              
9             our $VERSION = '1.002';
10              
11 5     5   240 use parent qw(Database::Async::Engine);
  5         10  
  5         28  
12              
13             =encoding utf8
14              
15             =head1 NAME
16              
17             Database::Async::Engine::PostgreSQL - support for PostgreSQL databases in L
18              
19             =head1 DESCRIPTION
20              
21             Provide a C URI when instantiating L to use this engine.
22              
23             $loop->add(
24             my $dbh = Database::Async->new(
25             uri => 'postgresql://localhost'
26             )
27             );
28              
29             Connection can also be made using a service definition, as described in L.
30              
31             $loop->add(
32             my $dbh = Database::Async->new(
33             type => 'postgresql',
34             engine => {
35             service => 'example',
36             }
37             )
38             );
39              
40             If neither URI nor service are provided, the C environment variable is attempted, and will fall back
41             to localhost (similar to C behaviour).
42              
43             $loop->add(
44             my $dbh = Database::Async->new(
45             type => 'postgresql',
46             )
47             );
48              
49              
50             =cut
51              
52 5     5   42543 no indirect;
  5         1998  
  5         30  
53 5     5   1170 use Syntax::Keyword::Try;
  5         3920  
  5         30  
54 5     5   1160 use Ryu::Async;
  5         255456  
  5         171  
55 5     5   2059 use Ryu::Observable;
  5         8096  
  5         172  
56 5     5   33 use curry;
  5         13  
  5         100  
57 5     5   24 use Scalar::Util ();
  5         11  
  5         90  
58 5     5   1917 use URI::postgres;
  5         8114  
  5         192  
59 5     5   1857 use URI::QueryParam;
  5         4039  
  5         182  
60 5     5   33 use Future::AsyncAwait;
  5         11  
  5         45  
61 5     5   1043 use Database::Async::Query;
  5         6528  
  5         175  
62 5     5   2330 use File::HomeDir;
  5         26121  
  5         319  
63 5     5   2109 use Config::Tiny;
  5         5304  
  5         172  
64 5     5   37 use Encode ();
  5         10  
  5         80  
65 5     5   1997 use MIME::Base64 ();
  5         3075  
  5         167  
66 5     5   2856 use Bytes::Random::Secure ();
  5         41694  
  5         157  
67 5     5   1867 use Unicode::UTF8;
  5         2411  
  5         231  
68 5     5   2098 use Crypt::Digest::SHA256 ();
  5         19850  
  5         125  
69 5     5   2078 use Crypt::Mac::HMAC ();
  5         5286  
  5         213  
70              
71 5     5   1960 use Protocol::Database::PostgreSQL::Client qw(2.000);
  5         141704  
  5         295  
72 5     5   2204 use Protocol::Database::PostgreSQL::Constants qw(:v1);
  5         1982  
  5         695  
73              
74 5     5   37 use Log::Any qw($log);
  5         9  
  5         29  
75              
76             use overload
77 0     0   0 '""' => sub { ref(shift) },
78 0     0   0 bool => sub { 1 },
79 5     5   1256 fallback => 1;
  5         10  
  5         64  
80              
81             Database::Async::Engine->register_class(
82             postgresql => __PACKAGE__
83             );
84              
85             =head1 METHODS
86              
87             =head2 configure
88              
89             =cut
90              
91             sub configure {
92 4     4 1 10774 my ($self, %args) = @_;
93 4         10 for (qw(service encoding application_name)) {
94 12 50       27 $self->{$_} = delete $args{$_} if exists $args{$_};
95             }
96 4         15 return $self->next::method(%args);
97             }
98              
99 0     0 0 0 sub encoding { shift->{encoding} }
100              
101             =head2 connection
102              
103             Returns a L representing the database connection,
104             and will attempt to connect if we are not already connected.
105              
106             =cut
107              
108             sub connection {
109 0     0 1 0 my ($self) = @_;
110 0   0     0 $self->{connection} //= $self->connect;
111             }
112              
113             =head2 ssl
114              
115             Whether to try SSL or not, expected to be one of the following
116             values from L:
117              
118             =over 4
119              
120             =item * C
121              
122             =item * C
123              
124             =item * C
125              
126             =back
127              
128             =cut
129              
130 0     0 1 0 sub ssl { shift->{ssl} }
131              
132             =head2 read_len
133              
134             Buffer read length. Higher values mean we will attempt to read more
135             data for each I/O loop iteration.
136              
137             Defaults to 2 megabytes.
138              
139             =cut
140              
141 0   0 0 1 0 sub read_len { shift->{read_len} //= 2 * 1024 * 1024 }
142              
143             =head2 write_len
144              
145             Buffer write length. Higher values mean we will attempt to write more
146             data for each I/O loop iteration.
147              
148             Defaults to 2 megabytes.
149              
150             =cut
151              
152 0   0 0 1 0 sub write_len { shift->{write_len} //= 2 * 1024 * 1024 }
153              
154             =head2 connect
155              
156             Establish a connection to the server.
157              
158             Returns a L which resolves to the L
159             once ready.
160              
161             =cut
162              
163 0     0 1 0 async sub connect {
164 0         0 my ($self) = @_;
165 0         0 my $loop = $self->loop;
166              
167 0         0 my $connected = $self->connected;
168 0 0       0 die 'We think we are already connected, and that is bad' if $connected->as_numeric;
169              
170             # Initial connection is made directly through the URI
171             # parameters. Eventually we also want to support UNIX
172             # socket and other types.
173 0 0 0     0 $self->{uri} ||= $self->uri_for_service($self->service) if $self->service;
174 0         0 my $uri = $self->uri;
175 0 0       0 die 'bad URI' unless ref $uri;
176 0         0 $log->tracef('URI for connection is %s', "$uri");
177 0         0 my $endpoint = join ':', $uri->host, $uri->port;
178              
179 0         0 $log->tracef('Will connect to %s', $endpoint);
180 0         0 $self->{ssl} = do {
181 0   0     0 my $mode = $uri->query_param('sslmode') // 'prefer';
182 0   0     0 $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode;
183             };
184              
185             # We're assuming TCP (either v4 or v6) here, but there's not really any reason we couldn't have
186             # UNIX sockets or other transport layers here other than lack of demand so far.
187 0         0 my @connect_params;
188 0 0 0     0 if ($uri->host and not $uri->host =~ m!^[/@]!) {
    0          
189 0         0 @connect_params = (
190             service => $uri->port,
191             host => $uri->host,
192             socktype => 'stream',
193             );
194             } elsif ($uri->host eq '') {
195 0         0 @connect_params = (
196             addr => {
197             family => 'unix',
198             socktype => 'stream',
199             path => '/var/run/postgresql/.s.PGSQL.'.$uri->port,
200             }
201             );
202             } else {
203 0         0 @connect_params = (
204             addr => {
205             family => 'unix',
206             socktype => 'stream',
207             path => $uri->host.'/.s.PGSQL.'.$uri->port,
208             }
209             );
210             }
211 0         0 my $sock = await $loop->connect(@connect_params);
212              
213 0 0 0     0 if ($sock->sockdomain == Socket::PF_INET or $sock->sockdomain == Socket::PF_INET6) {
    0          
214 0         0 my $local = join ':', $sock->sockhost_service(1);
215 0         0 my $remote = join ':', $sock->peerhost_service(1);
216 0         0 $log->tracef('Connected to %s as %s from %s', $endpoint, $remote, $local);
217             } elsif ($sock->sockdomain == Socket::PF_UNIX) {
218 0         0 $log->tracef('Connected to %s as %s', $endpoint, $sock->peerpath);
219             }
220              
221             # We start with a null handler for read, because our behaviour varies depending on
222             # whether we want to go through the SSL dance or not.
223             $self->add_child(
224             my $stream = IO::Async::Stream->new(
225             handle => $sock,
226 0     0   0 on_read => sub { 0 }
227             )
228 0         0 );
229              
230             # SSL is conveniently simple: a prefix exchange before the real session starts,
231             # and the user can decide whether SSL is mandatory or optional.
232 0         0 $stream = await $self->negotiate_ssl(
233             stream => $stream,
234             );
235              
236 0         0 Scalar::Util::weaken($self->{stream} = $stream);
237             $self->outgoing->each(sub {
238 0     0   0 $log->tracef('Write bytes [%v02x]', $_);
239 0         0 $self->ready_for_query->set_string('');
240 0         0 $self->stream->write("$_");
241 0         0 return;
242 0         0 });
243 0         0 $stream->configure(
244             on_read => $self->curry::weak::on_read,
245             read_len => $self->read_len,
246             write_len => $self->write_len,
247             autoflush => 0,
248             );
249              
250 0         0 $log->tracef('Send initial request with user %s', $uri->user);
251              
252             # This is where the extensible options for initial connection are applied:
253             # we have already handled SSL by this point, so we exclude this from the
254             # list and pass everything else directly to the startup packet.
255 0         0 my %qp = $uri->query_params;
256 0         0 delete $qp{sslmode};
257              
258 0   0     0 $qp{application_name} //= $self->application_name;
259 0         0 $self->protocol->send_startup_request(
260             database => $self->database_name,
261             user => $self->database_user,
262             %qp
263             );
264 0         0 $connected->set_numeric(1);
265 0         0 return $stream;
266             }
267              
268             =head2 service_conf_path
269              
270             Return the expected location for the pg_service.conf file.
271              
272             =cut
273              
274             sub service_conf_path {
275 8     8 1 7593 my ($class) = @_;
276 8 100       34 return $ENV{PGSERVICEFILE} if exists $ENV{PGSERVICEFILE};
277 4 100       42 return $ENV{PGSYSCONFDIR} . '/pg_service.conf' if exists $ENV{PGSYSCONFDIR};
278 2         12 my $path = File::HomeDir->my_home . '/.pg_service.conf';
279 2 100       55 return $path if -r $path;
280 1         8 return '/etc/pg_service.conf';
281             }
282              
283             sub service_parse {
284 3     3 0 9 my ($class, $path) = @_;
285 3         19 return Config::Tiny->read($path, 'encoding(UTF-8)');
286             }
287              
288             sub find_service {
289 3     3 0 7018 my ($class, $srv) = @_;
290 3         11 my $data = $class->service_parse(
291             $class->service_conf_path
292             );
293 3 100       1998 die 'service ' . $srv . ' not found in config' unless $data->{$srv};
294 2         22 return $data->{$srv};
295             }
296              
297 0   0 0 0 0 sub service { shift->{service} //= $ENV{PGSERVICE} }
298              
299             sub database_name {
300 8     8 0 271 my $uri = shift->uri;
301 8   33     44 return $uri->dbname // $uri->user // 'postgres'
      50        
302             }
303              
304             sub database_user {
305 8     8 0 254 my $uri = shift->uri;
306 8   50     48 return $uri->user // 'postgres'
307             }
308              
309             sub password_from_file {
310 18     18 0 632 my $self = shift;
311 18   66     96 my $pwfile = $ENV{PGPASSFILE} || File::HomeDir->my_home . '/.pgpass';
312              
313 18 50       92 unless ($^O eq 'MSWin32') { # same as libpq
314             # libpq also does stat here instead of lstat. So, pgpass can be
315             # a logical link.
316 18 50       227 my (undef, undef, $mode) = stat $pwfile or return undef;
317 18 50       62 unless (-f _) {
318 0         0 $log->warnf("WARNING: password file \"%s\" is not a plain file\n", $pwfile);
319 0         0 return undef;
320             }
321              
322 18 100       44 if ($mode & 077) {
323 1         10 $log->warnf("WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less", $pwfile);
324 1         103 return undef;
325             }
326             # libpq has the same race condition of stat versus open.
327             }
328              
329             # It's not an error for this file to be missing: it might not
330             # be readable for various reasons, but for now we ignore that case as well
331             # (we've already checked for overly-lax permissions above)
332 17 50       485 open my $fh, '<', $pwfile or return undef;
333              
334 17         376 while (defined(my $line = readline $fh)) {
335 17 50       53 next if $line =~ '^#';
336 17         28 chomp $line;
337 17 50       214 my ($host, $port, $db, $user, $pw) = ($line =~ /((?:\\.|[^:])*)(?::|$)/g)
338             or next;
339 17         73 s/\\(.)/$1/g for ($host, $port, $db, $user, $pw);
340              
341 17 50 66     142 return $pw if (
      66        
      33        
      66        
      33        
      66        
      33        
342             $host eq '*' || $host eq $self->uri->host and
343             $port eq '*' || $port eq $self->uri->port and
344             $user eq '*' || $user eq $self->database_user and
345             $db eq '*' || $db eq $self->database_name
346             );
347             }
348              
349 0         0 return undef;
350             }
351              
352             sub database_password {
353 20     20 0 27588 my $self = shift;
354 20   66     68 return $self->uri->password // $ENV{PGPASSWORD} || $self->password_from_file
355             }
356              
357             =head2 negotiate_ssl
358              
359             Apply SSL negotiation.
360              
361             =cut
362              
363 0     0 1   async sub negotiate_ssl {
364 0           my ($self, %args) = @_;
365 0           my $stream = delete $args{stream};
366              
367             # If SSL is disabled entirely, just return the same stream as-is
368 0 0         my $ssl = $self->ssl
369             or return $stream;
370              
371 0           require IO::Async::SSL;
372 0           require IO::Socket::SSL;
373              
374 0           $log->tracef('Attempting to negotiate SSL');
375 0           await $stream->write($self->protocol->ssl_request);
376              
377 0           $log->tracef('Waiting for response');
378 0           my ($resp, $eof) = await $stream->read_exactly(1);
379              
380 0 0         $log->tracef('Read %v02x from server for SSL response (EOF is %s)', $resp, $eof ? 'true' : 'false');
381 0 0         die 'Server closed connection' if $eof;
382              
383 0 0         if($resp eq 'S') {
    0          
384             # S for SSL...
385 0           $log->tracef('This is SSL, let us upgrade');
386             $stream = await $self->loop->SSL_upgrade(
387             handle => $stream,
388             # SSL defaults...
389             SSL_server => 0,
390             SSL_hostname => $self->uri->host,
391             SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE(),
392             # Pass through anything SSL-related unchanged, the user knows
393             # better than we do
394 0           (map {; $_ => $self->{$_} } grep { /^SSL_/ } keys %$self)
395             );
396 0           $log->tracef('Upgrade complete');
397             } elsif($resp eq 'N') {
398             # N for "no SSL"...
399 0           $log->tracef('No to SSL');
400 0 0         die 'Server does not support SSL' if $self->ssl == SSL_REQUIRE;
401             } else {
402             # anything else is unexpected
403 0           die 'Unknown response to SSL request';
404             }
405 0           return $stream;
406             }
407              
408 0   0 0 0   sub is_replication { shift->{is_replication} //= 0 }
409 0   0 0 0   sub application_name { shift->{application_name} //= 'perl' }
410              
411             =head2 uri_for_dsn
412              
413             Returns a L corresponding to the given L.
414              
415             May throw an exception if we don't have a valid string.
416              
417             =cut
418              
419             sub uri_for_dsn {
420 0     0 1   my ($class, $dsn) = @_;
421 0 0         die 'invalid DSN, expecting DBI:Pg:...' unless $dsn =~ s/^DBI:Pg://i;
422 0           my %args = split /[=;]/, $dsn;
423 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
424 0           $uri->$_(delete $args{$_}) for grep exists $args{$_}, qw(host port user password dbname);
425 0           $uri
426             }
427              
428             sub uri_for_service {
429 0     0 0   my ($class, $service) = @_;
430 0           my $cfg = $class->find_service($service);
431              
432             # Start with common default values (i.e. follow libpq behaviour unless there's a strong reason not to)
433 0           my $uri = URI->new('postgresql://postgres@localhost/postgres');
434              
435             # Standard fields supported by URI::pg
436 0           $uri->$_(delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(host port user password dbname);
437             # ... note that `hostaddr` takes precedence over plain `host`
438 0 0         $uri->host(delete $cfg->{hostaddr}) if exists $cfg->{hostaddr};
439              
440             # Everything else is handled via query parameters, this list is non-exhaustive and likely to be
441             # extended in future (e.g. text/binary protocol mode)
442 0           $uri->query_param($_ => delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(
443             application_name
444             fallback_application_name
445             keepalives
446             options
447             sslmode
448             replication
449             );
450 0           $uri
451             }
452              
453             =head2 stream
454              
455             The L representing the database connection.
456              
457             =cut
458              
459 0     0 1   sub stream { shift->{stream} }
460              
461             =head2 on_read
462              
463             Process incoming database packets.
464              
465             Expects the following parameters:
466              
467             =over 4
468              
469             =item * C<$stream> - the L we are receiving data on
470              
471             =item * C<$buffref> - a scalar reference to the current input data buffer
472              
473             =item * C<$eof> - true if we have reached the end of input
474              
475             =back
476              
477             =cut
478              
479             sub on_read {
480 0     0 1   my ($self, $stream, $buffref, $eof) = @_;
481              
482             try {
483             $log->tracef('Have server message of length %d', length $$buffref);
484             while(my $msg = $self->protocol->extract_message($buffref)) {
485             $log->tracef('Message: %s', $msg);
486             $self->incoming->emit($msg);
487             }
488             } catch($e) {
489             # This really shouldn't happen, but since we can't trust our current state we should drop
490             # the connection ASAP, and avoid any chance of barrelling through to a COMMIT or other
491             # risky operation.
492             $log->errorf('Failed to handle read, connection is no longer in a valid state: %s', $e);
493             $stream->close_now;
494             } finally {
495             $self->connected->set_numeric(0) if $eof;
496             }
497 0           return 0;
  0            
498             }
499              
500             =head2 ryu
501              
502             Provides a L instance.
503              
504             =cut
505              
506             sub ryu {
507 0     0 1   my ($self) = @_;
508 0   0       $self->{ryu} //= do {
509 0           $self->add_child(
510             my $ryu = Ryu::Async->new
511             );
512 0           $ryu
513             }
514             }
515              
516             =head2 outgoing
517              
518             L representing outgoing packets for the current database connection.
519              
520             =cut
521              
522             sub outgoing {
523 0     0 1   my ($self) = @_;
524 0   0       $self->{outgoing} //= $self->ryu->source;
525             }
526              
527             =head2 incoming
528              
529             L representing incoming packets for the current database connection.
530              
531             =cut
532              
533             sub incoming {
534 0     0 1   my ($self) = @_;
535 0   0       $self->{incoming} //= $self->ryu->source;
536             }
537              
538             =head2 connected
539              
540             A L which will be 1 while the connection is in a valid state,
541             and 0 if we're disconnected.
542              
543             =cut
544              
545             sub connected {
546 0     0 1   my ($self) = @_;
547 0   0       $self->{connected} //= do {
548 0           my $obs = Ryu::Observable->new(0);
549             $obs->subscribe(
550             $self->$curry::weak(sub {
551 0     0     my ($self, $v) = @_;
552             # We only care about disconnection events
553 0 0         return if $v;
554              
555             # If we were doing something, then it didn't work
556 0 0         if(my $query = delete $self->{active_query}) {
557 0 0         $query->completed->fail('disconnected') unless $query->completed->is_ready;
558             }
559              
560             # Tell the database pool management that we're no longer useful
561 0 0         if(my $db = $self->db) {
562 0           $db->engine_disconnected($self);
563             }
564             })
565 0           );
566 0           $obs
567             };
568             }
569              
570             =head2 authenticated
571              
572             Resolves once database authentication is complete.
573              
574             =cut
575              
576             sub authenticated {
577 0     0 1   my ($self) = @_;
578 0   0       $self->{authenticated} //= $self->loop->new_future;
579             }
580              
581             # Handlers for authentication messages from backend.
582             our %AUTH_HANDLER = (
583             AuthenticationOk => sub {
584             my ($self, $msg) = @_;
585             $self->authenticated->done;
586             },
587             AuthenticationKerberosV5 => sub {
588             my ($self, $msg) = @_;
589             die "Not yet implemented";
590             },
591             AuthenticationCleartextPassword => sub {
592             my ($self, $msg) = @_;
593             $self->protocol->send_message(
594             'PasswordMessage',
595             user => $self->encode_text($self->uri->user),
596             password_type => 'plain',
597             password => $self->encode_text($self->database_password),
598             );
599             },
600             AuthenticationMD5Password => sub {
601             my ($self, $msg) = @_;
602             $self->protocol->send_message(
603             'PasswordMessage',
604             user => $self->encode_text($self->uri->user),
605             password_type => 'md5',
606             password_salt => $msg->password_salt,
607             password => $self->encode_text($self->database_password),
608             );
609             },
610             AuthenticationSCMCredential => sub {
611             my ($self, $msg) = @_;
612             die "Not yet implemented";
613             },
614             AuthenticationGSS => sub {
615             my ($self, $msg) = @_;
616             die "Not yet implemented";
617             },
618             AuthenticationSSPI => sub {
619             my ($self, $msg) = @_;
620             die "Not yet implemented";
621             },
622             AuthenticationGSSContinue => sub {
623             my ($self, $msg) = @_;
624             die "Not yet implemented";
625             },
626             AuthenticationSASL => sub {
627             my ($self, $msg) = @_;
628             $log->tracef('SASL starts');
629             my $nonce = MIME::Base64::encode_base64(Bytes::Random::Secure::random_string_from(join('', ('a'..'z'), ('A'..'Z'), ('0'..'9')), 18), '');
630             $self->{client_first_message} = 'n,,n=,r=' . $nonce;
631             $self->protocol->send_message(
632             'SASLInitialResponse',
633             mechanism => 'SCRAM-SHA-256',
634             nonce => $nonce,
635             );
636             },
637             AuthenticationSASLContinue => sub {
638             my ($self, $msg) = @_;
639             $log->tracef('Have msg %s', $msg);
640              
641             my $rounds = $msg->password_rounds or die 'need iteration count';
642              
643             my $server_first_message = $msg->server_first_message;
644             my $pass = Unicode::UTF8::encode_utf8($self->database_password);
645             my $salted_password = do {
646             my $hash = Crypt::Mac::HMAC::hmac('SHA256', $pass, MIME::Base64::decode_base64($msg->password_salt), pack('N1', 1));
647             my $out = $hash;
648             # Skip the first round - that's our original $hash value - and recursively re-hash
649             # for the remainder, incrementally building our bitwise XOR result
650             for my $idx (1..$rounds-1) {
651             $hash = Crypt::Mac::HMAC::hmac('SHA256', $pass, $hash);
652             $out = "$out" ^ "$hash";
653             }
654             $out
655             };
656             # The client key uses the literal string 'Client Key' as the base - for the server, it'd be 'Server Key'
657             my $client_key = Crypt::Mac::HMAC::hmac('SHA256', $salted_password, "Client Key");
658             my $server_key = Crypt::Mac::HMAC::hmac('SHA256', $salted_password, "Server Key");
659             # Then we hash this to get the stored key, which will be used for the signature
660             my $stored_key = Crypt::Digest::SHA256::sha256($client_key);
661              
662             my $client_first_message = $self->{client_first_message};
663             # Strip out the channel-binding GS2 header
664             my $header = 'n,,';
665             $client_first_message =~ s{^\Q$header}{};
666             # ... but we _do_ want the header in the final-message c= GS2 component
667             my $client_final_message = 'c=' . MIME::Base64::encode_base64($header, '') . ',r=' . $msg->password_nonce;
668              
669             # this is what we want to sign!
670             my $auth_message = join ',', $client_first_message, $server_first_message, $client_final_message;
671             $log->tracef('Auth message = %s', $auth_message);
672              
673             my $client_signature = Crypt::Mac::HMAC::hmac('SHA256', $stored_key, $auth_message);
674             my $client_proof = "$client_key" ^ "$client_signature";
675             $log->tracef('Client proof is %s', $client_proof);
676             my $server_signature = Crypt::Mac::HMAC::hmac('SHA256', $server_key, $auth_message);
677             $self->{expected_server_signature} = $server_signature;
678             $self->protocol->send_message(
679             'SASLResponse',
680             header => $header,
681             nonce => $msg->password_nonce,
682             proof => $client_proof,
683             );
684             },
685             AuthenticationSASLFinal => sub {
686             my ($self, $msg) = @_;
687             my $expected = MIME::Base64::encode_base64($self->{expected_server_signature}, '');
688             die 'invalid server signature ' . $msg->server_signature . ', expected ' . $expected unless $msg->server_signature eq $expected;
689             $log->tracef('Server signature seems fine, continue with auth');
690             # No further action required, we'll get an AuthenticationOk immediately after this
691             }
692             );
693              
694             =head2 protocol
695              
696             Returns the L instance, creating it
697             and setting up event handlers if necessary.
698              
699             =cut
700              
701             sub protocol {
702 0     0 1   my ($self) = @_;
703 0   0       $self->{protocol} //= do {
704 0           my $pg = Protocol::Database::PostgreSQL::Client->new(
705             database => $self->database_name,
706             outgoing => $self->outgoing,
707             );
708             $self->incoming
709             ->switch_str(
710 0     0     sub { $_->type },
711             authentication_request => $self->$curry::weak(sub {
712 0     0     my ($self, $msg) = @_;
713 0           $log->tracef('Auth request received: %s', $msg);
714 0 0         my $code = $AUTH_HANDLER{$msg->auth_type}
715             or $log->errorf('unknown auth type %s', $msg->auth_type);
716 0           $self->$code($msg);
717             }),
718             password => $self->$curry::weak(sub {
719 0     0     my ($self, %args) = @_;
720 0           $log->tracef('Auth request received: %s', \%args);
721 0           $self->protocol->{user} = $self->uri->user;
722 0           $self->protocol->send_message('PasswordMessage', password => $self->encode_text($self->database_password));
723             }),
724             parameter_status => $self->$curry::weak(sub {
725 0     0     my ($self, $msg) = @_;
726 0           $log->tracef('Parameter received: %s', $msg);
727 0           $self->set_parameter(map $self->decode_text($_), $msg->key => $msg->value);
728             }),
729             row_description => $self->$curry::weak(sub {
730 0     0     my ($self, $msg) = @_;
731 0           $log->tracef('Row description %s', $msg);
732 0 0         $log->errorf('No active query?') unless my $q = $self->active_query;
733 0           $q->row_description($msg->description);
734             }),
735             data_row => $self->$curry::weak(sub {
736 0     0     my ($self, $msg) = @_;
737 0           $log->tracef('Have row data %s', $msg);
738             $self->{fc} ||= $self->active_query->row_data->flow_control->each($self->$curry::weak(sub {
739 0           my ($self) = @_;
740 0 0         $log->tracef('Flow control event - will %s stream', $_ ? 'resume' : 'pause');
741 0 0         $self->stream->want_readready($_) if $self->stream;
742 0   0       }));
743 0           $self->active_query->row([ map $self->decode_text($_), $msg->fields ]);
744             }),
745             command_complete => $self->$curry::weak(sub {
746 0     0     my ($self, $msg) = @_;
747 0           delete $self->{fc};
748 0 0         my $query = delete $self->{active_query} or do {
749 0           $log->warnf('Command complete but no query');
750 0           return;
751             };
752 0           $log->tracef('Completed query %s with result %s', $query, $msg->result);
753 0 0         $query->done unless $query->completed->is_ready;
754             }),
755             no_data => $self->$curry::weak(sub {
756 0     0     my ($self, $msg) = @_;
757 0           $log->tracef('Completed query %s with no data', $self->active_query);
758             # my $query = delete $self->{active_query};
759             # $query->done if $query;
760             }),
761             send_request => $self->$curry::weak(sub {
762 0     0     my ($self, $msg) = @_;
763 0           $log->tracef('Send request for %s', $msg);
764 0           $self->stream->write($msg);
765             }),
766             ready_for_query => $self->$curry::weak(sub {
767 0     0     my ($self, $msg) = @_;
768 0           $log->tracef('Ready for query, state is %s', $msg->state);
769 0           $self->ready_for_query->set_string($msg->state);
770 0 0         $self->db->engine_ready($self) if $self->db;
771             }),
772             backend_key_data => $self->$curry::weak(sub {
773 0     0     my ($self, $msg) = @_;
774 0           $log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
775             }),
776             parse_complete => $self->$curry::weak(sub {
777 0     0     my ($self, $msg) = @_;
778 0           $log->tracef('Parsing complete for query %s', $self->active_query);
779             }),
780             bind_complete => $self->$curry::weak(sub {
781 0     0     my ($self, $msg) = @_;
782 0           $log->tracef('Bind complete for query %s', $self->active_query);
783             }),
784             close_complete => $self->$curry::weak(sub {
785 0     0     my ($self, $msg) = @_;
786 0           delete $self->{fc};
787 0           $log->tracef('Close complete for query %s', $self->active_query);
788             }),
789             empty_query_response => $self->$curry::weak(sub {
790 0     0     my ($self, $msg) = @_;
791 0           $log->tracef('Query returned no results for %s', $self->active_query);
792             }),
793             error_response => $self->$curry::weak(sub {
794 0     0     my ($self, $msg) = @_;
795 0 0         if(my $query = $self->active_query) {
796 0           $log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
797 0           my $f = $query->completed;
798 0 0         $f->fail($msg->error) unless $f->is_ready;
799             } else {
800 0           $log->errorf('Received error %s with no active query', $msg->error);
801             }
802             }),
803             copy_in_response => $self->$curry::weak(sub {
804 0     0     my ($self, $msg) = @_;
805 0           my $query = $self->active_query;
806 0           $log->tracef('Ready to copy data for %s', $query);
807 0           my $proto = $self->protocol;
808             {
809 0           my $src = $query->streaming_input;
  0            
810             $src->completed
811             ->on_ready(sub {
812 0           my ($f) = @_;
813 0           $log->tracef('Sending copy done notification, stream status was %s', $f->state);
814 0           $proto->send_message(
815             'CopyDone',
816             data => '',
817             );
818 0           $proto->send_message(
819             'Close',
820             portal => '',
821             statement => '',
822             );
823 0           $proto->send_message(
824             'Sync',
825             portal => '',
826             statement => '',
827             );
828 0           });
829             $src->each(sub {
830 0           $log->tracef('Sending %s', $_);
831 0           $proto->send_copy_data($_);
832 0           });
833             }
834 0 0         $query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
835             }),
836             copy_out_response => $self->$curry::weak(sub {
837 0     0     my ($self, $msg) = @_;
838 0           $log->tracef('copy out starts %s', $msg);
839             # $self->active_query->row([ $msg->fields ]);
840             }),
841             copy_data => $self->$curry::weak(sub {
842 0     0     my ($self, $msg) = @_;
843 0           $log->tracef('Have copy data %s', $msg);
844 0 0         my $query = $self->active_query or do {
845 0           $log->warnf('No active query for copy data');
846 0           return;
847             };
848 0           $query->row([ map $self->decode_text($_), @$_ ]) for $msg->rows;
849             }),
850             copy_done => $self->$curry::weak(sub {
851 0     0     my ($self, $msg) = @_;
852 0           $log->tracef('Copy done - %s', $msg);
853             }),
854             notification_response => $self->$curry::weak(sub {
855 0     0     my ($self, $msg) = @_;
856 0           my ($chan, $data) = @{$msg}{qw(channel data)};
  0            
857 0           $log->tracef('Notification on channel %s containing %s', $chan, $data);
858 0           $self->db->notification($self, map $self->decode_text($_), $chan, $data);
859             }),
860 0     0     sub { $log->errorf('Unknown message %s (type %s)', $_, $_->type) }
861 0           );
862 0           $pg
863             }
864             }
865              
866             sub stream_from {
867 0     0 0   my ($self, $src) = @_;
868 0           my $proto = $self->proto;
869             $src->each(sub {
870 0     0     $log->tracef('Sending %s', $_);
871             # This is already UTF-8 encoded in the protocol handler,
872             # since it's a text-based protocol
873 0           $proto->send_copy_data($_);
874             })
875 0           }
876              
877             =head2 set_parameter
878              
879             Marks a parameter update from the server.
880              
881             =cut
882              
883             sub set_parameter {
884 0     0 1   my ($self, $k, $v) = @_;
885 0 0         if(my $param = $self->{parameter}{$k}) {
886 0           $param->set_string($v);
887             } else {
888 0           $self->{parameter}{$k} = Ryu::Observable->new($v);
889             }
890 0           $self
891             }
892              
893             =head2 idle
894              
895             Resolves when we are idle and ready to process the next request.
896              
897             =cut
898              
899             sub idle {
900 0     0 1   my ($self) = @_;
901             $self->{idle} //= $self->loop->new_future->on_ready(sub {
902             delete $self->{idle}
903 0   0 0     });
  0            
904             }
905              
906             sub ready_for_query {
907 0     0 0   my ($self) = @_;
908 0   0       $self->{ready_for_query} //= do {
909             Ryu::Observable->new(0)->subscribe($self->$curry::weak(sub {
910 0     0     my ($self, $v) = @_;
911 0 0 0       return unless my $idle = $self->{idle} and $v;
912 0 0         $idle->done unless $idle->is_ready;
913 0           }))
914             }
915             }
916              
917             sub simple_query {
918 0     0 0   my ($self, $sql) = @_;
919 0 0         die 'already have active query' if $self->{active_query};
920 0           $self->{active_query} = my $query = Database::Async::Query->new(
921             sql => $sql,
922             row_data => my $src = $self->ryu->source
923             );
924 0           $self->protocol->simple_query($self->encode_text($query->sql));
925 0           return $src;
926             }
927              
928             sub encode_text {
929 0     0 0   my ($self, $txt) = @_;
930 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
931 0 0         return Unicode::UTF8::encode_utf8($txt) if $encoding eq 'UTF-8';
932 0           return Encode::encode($encoding, $txt, Encode::FB_CROAK);
933             }
934              
935             sub decode_text {
936 0     0 0   my ($self, $txt) = @_;
937 0 0 0       return $txt unless defined $txt and my $encoding = $self->encoding;
938 0 0         return Unicode::UTF8::decode_utf8($txt) if $encoding eq 'UTF-8';
939 0           return Encode::decode($encoding, $txt, Encode::FB_CROAK);
940             }
941              
942             sub handle_query {
943 0     0 0   my ($self, $query) = @_;
944 0 0         die 'already have active query' if $self->{active_query};
945 0           $self->{active_query} = $query;
946 0           my $proto = $self->protocol;
947 0           $proto->send_message(
948             'Parse',
949             sql => $self->encode_text($query->sql),
950             statement => '',
951             );
952 0           $proto->send_message(
953             'Bind',
954             portal => '',
955             statement => '',
956             param => [ map $self->encode_text($_), $query->bind ],
957             );
958 0           $proto->send_message(
959             'Describe',
960             portal => '',
961             statement => '',
962             );
963 0           $proto->send_message(
964             'Execute',
965             portal => '',
966             statement => '',
967             );
968 0 0         unless($query->{in}) {
969 0           $proto->send_message(
970             'Close',
971             portal => '',
972             statement => '',
973             );
974 0           $proto->send_message(
975             'Sync',
976             portal => '',
977             statement => '',
978             );
979             }
980             Future->done
981 0           }
982              
983 0     0 1   sub query { die 'use handle_query instead'; }
984              
985 0     0 0   sub active_query { shift->{active_query} }
986              
987             =head2 _remove_from_loop
988              
989             Called when this engine instance is removed from the main event loop, usually just before the instance
990             is destroyed.
991              
992             Since we could be in various states of authentication or query processing, we potentially have many
993             different elements to clean up here. We do these explicitly so that we can apply some ordering to the events:
994             clear out things that relate to queries before dropping the connection, for example.
995              
996             =cut
997              
998             sub _remove_from_loop {
999 0     0     my ($self, $loop) = @_;
1000 0 0         if(my $query = delete $self->{active_query}) {
1001 0 0         $query->fail('disconnected') unless $query->completed->is_ready;
1002             }
1003 0 0         if(my $idle = delete $self->{idle}) {
1004 0           $idle->cancel;
1005             }
1006 0 0         if(my $auth = delete $self->{authenticated}) {
1007 0           $auth->cancel;
1008             }
1009 0 0         if(my $connected = delete $self->{connected}) {
1010 0           $connected->finish;
1011             }
1012 0 0         if(my $outgoing = delete $self->{outgoing}) {
1013 0 0         $outgoing->finish unless $outgoing->is_ready;
1014             }
1015 0 0         if(my $incoming = delete $self->{incoming}) {
1016 0 0         $incoming->finish unless $incoming->is_ready;
1017             }
1018 0 0         if(my $stream = delete $self->{stream}) {
1019 0           $stream->close_now;
1020 0           $stream->remove_from_parent;
1021             }
1022 0 0         if(my $conn = delete $self->{connection}) {
1023 0           $conn->cancel;
1024             }
1025             # Observable connection parameters - signal that no further updates are expected
1026 0           for my $k (keys %{$self->{parameter}}) {
  0            
1027 0           my $param = delete $self->{parameter}{$k};
1028 0 0         $param->finish if $param;
1029             }
1030 0 0         if(my $ryu = delete $self->{ryu}) {
1031 0           $ryu->remove_from_parent;
1032             }
1033 0           delete $self->{protocol};
1034 0           return $self->next::method($loop);
1035             }
1036              
1037             1;
1038              
1039             __END__