File Coverage

blib/lib/PGObject/Util/Replication/Standby.pm
Criterion Covered Total %
statement 88 113 77.8
branch 17 38 44.7
condition 5 10 50.0
subroutine 14 18 77.7
pod 7 7 100.0
total 131 186 70.4


line stmt bran cond sub pod time code
1             package PGObject::Util::Replication::Standby;
2              
3 5     5   59005 use 5.006;
  5         20  
4 5     5   28 use strict;
  5         11  
  5         117  
5 5     5   27 use warnings;
  5         18  
  5         138  
6 5     5   2787 use URI;
  5         41117  
  5         202  
7 5     5   3066 use URI::QueryParam;
  5         4389  
  5         183  
8 5     5   38 use Carp;
  5         17  
  5         456  
9 5     5   3315 use Moo;
  5         75323  
  5         35  
10             extends 'PGObject::Util::Replication::SMO';
11              
12              
13             =head1 NAME
14              
15             PGObject::Util::Replication::Standby - Manage PG replication standbys
16              
17             =head1 VERSION
18              
19             Version 0.02
20              
21             =cut
22              
23             our $VERSION = '0.02';
24              
25              
26             =head1 SYNOPSIS
27              
28              
29             use PGObject::Util::Replication::Standby;
30              
31             my $replica = PGObject::Util::Replication::Standby->new();
32             $replica->standby_name('denver', 1); // uses slot denver
33             $replica->upstream_host('pgmain.chicago.mydomain.foo');
34              
35             #however you may be better off setting cert auth instead.
36             $replica->credentials('foo', 'superdupersecret');
37              
38             # finally get the recovery.conf contents
39             $replica->recoveryconf_contents();
40              
41             ### manage replication slots
42             # clearing all slots, for example failing over
43             $replica->clearslots();
44              
45             # list all slots
46             $replica->slots();
47              
48             # add new slot
49             $replica->addslot('downstream1');
50              
51             # delete slot
52             $replica->deleteslot('downstream2');
53            
54              
55             #also ways to measure recovery lag
56             $lsn = $standby->recovery_lsn(); # current recovery log location
57             $standby->lag_bytes_from($lsn);
58              
59             # Promote to master
60             $standby->promote();
61              
62             # we can also get the master from the connection string, for example to look up the
63             # wal segments
64              
65             my $wal_info = $standby->master->ping_wall();
66              
67             =head1 DESCRIPTION AND USE
68              
69             This module manages replication-related functions on standbys.
70              
71             A I is a physical replica (i.e. data files are brought to the same
72             structure). Logical replication in this case is not supported in terms of
73             failover and the like.
74              
75             This module was written to make the task of managing replicated systems from
76             Rex much easier. The module thus supports the three basic aspects of
77             replication management:
78              
79             =over
80              
81             =item Configuration management of upstream and downstream links
82              
83             =item WAL telemetry on the receiving end, and calculating lag
84              
85             =item Promotion of a standby in a failover case.
86              
87             =back
88              
89             =head1 STANDBY PROPERTIES
90              
91             All of those of an SMO plus
92              
93             =head2 recoveryconf
94              
95             The config manager for the PostgreSQL
96              
97             =cut
98              
99             has recoveryconf => (is => 'lazy');
100              
101             my $recovery_vars = [qw(
102             recovery_command archive_cleanup_command recovery_end_command
103             recovery_target recovery_target_name recovery_target_time recovery_target_xid
104             recovery_target_inclusive recovery_target_timeline recovery_target_action
105             standby_mode primary_conninfo primary_slot_name trigger_file
106             recovery_min_apply_delay
107             )];
108              
109             sub _build_recoveryconf {
110 2     2   4333 my ($self) = @_;
111 2         16 return PGObject::Util::PGConfig->new( $recovery_vars );
112             }
113              
114             =head2 upstream_host
115              
116             =head2 upstream_port
117              
118             =head2 upstream_user
119              
120             =head2 upstream_password
121              
122             =head2 upstream_database
123              
124             =head2 standby_name
125              
126             =head2 recoveryconf_path
127              
128             Last path of the recoveryconf loaded, or the recoveryconf to remove
129             for promoting a standby.
130              
131             =cut
132              
133             has upstream_host => (is => 'rw', );
134             has upstream_port => (is => 'rw', default => 5432);
135             has upstream_user => (is => 'rw', );
136             has upstream_password => (is => 'rw', );
137             has upstream_database => (is => 'rw', default => 'postgres');
138             has standby_name => (is => 'rw', );
139             has recoveryconf_path => (is => 'rw');
140              
141             =head1 METHODS
142              
143             =head2 Recovery Configuration
144              
145             Recovery configuration here provides a basic interface for working with the parameters
146             in the recovery.conf file. Note that this file cannot be managed via ALTER SYSTEM
147             so a physical file must be generated even once this is supported in PGObject::Util::PGConfig
148              
149             =head3 set_recovery_param($name, $value)
150              
151             Sets the parameter for the recovery.conf
152              
153             =cut
154              
155             sub set_recovery_param {
156 1     1 1 13 my ($self, $name, $value) = @_;
157 1         25 $self->recoveryconf->set($name, $value);
158             }
159              
160             =head3 connection_string
161              
162             =head3 connection_string($cstring)
163              
164             Generates the connection string from the current attributes for the SMO.
165              
166             We accept reading aboth formats (key/value and URI). We always write URIs.
167              
168             This function in either form has the side effect of updating the
169             primary_conninfo field in the recoveryconf property.
170              
171             =cut
172              
173             sub connection_string {
174 14     14 1 3167 my ($self, $cstring) = @_;
175 14 50       39 return _set_connection_string(@_) if $cstring;
176 14         26 my $base = "postgresql://";
177 14         54 my $uri = URI->new($base);
178 14         9764 my $authority = $self->upstream_user;
179 14 100 66     61 $authority .= ":" . $self->upstream_password if $authority and $self->upstream_password;
180 14         43 $authority = join'@', grep {$_} ($authority, $self->upstream_host);
  28         81  
181 14 100       59 $uri->authority($authority) if $authority;
182 14         499 $uri->path($self->upstream_database);
183 14 50       600 $uri->query_form({application_name => $self->standby_name}) if $self->standby_name;
184              
185 14         311 $self->recoveryconf->set('primary_conninfo', $uri->as_string);
186 14         288 return $uri->as_string;
187             }
188              
189             sub _set_connection_string {
190 2     2   43 my ($self, $cstring) = @_;
191 2   50     11 $cstring //= '';
192 2 100       11 if ("$cstring" =~ m#^postgresql://#){
193 1         7 my $uri = URI->new($cstring);
194 1         302 my $authority = $uri->authority;
195 1         11 my $host;
196 1 50       5 if ($authority =~/\@/){
197 0         0 ($authority, $host) = split /\@/, $authority;
198             } else {
199 1         2 $host = $authority;
200 1         2 undef $authority;
201             }
202 1 50       4 $self->credentials(split /:/, $authority) if $authority;
203 1         3 my $dbname = $uri->path;
204 1         11 $dbname =~ s#^/##;
205 1         2 my $port;
206 1 50 33     7 ($host, $port) = split /:/, $host if $host and $host =~ /:\d+$/;;
207 1         4 $self->upstream_database($dbname);
208 1         4 $self->upstream_host($host);
209 1         2 $self->upstream_port($port);
210 1 50       15 $self->standby_name($uri->query_param('application_name'))
211             if $uri->query_param('application_name');
212             } else { # key/value format
213 1         3 my %args;
214 1         3 my $old_cstring = 'totally invalid value';
215 1         5 while (length($cstring)) {
216 1 50       4 die "failed parsing $cstring" if $old_cstring eq $cstring;
217 1         3 $old_cstring = $cstring;
218 1         6 $cstring =~ s/^([^=]+)=\s*//;
219 1   50     6 my $key = $1 // '';
220 1         3 my $value;
221 1 50       6 if ($cstring =~ /^'/){
222 1         7 $cstring =~ s/'((?:[^']|'')*)'\s*//;
223 1         3 $value = $1;
224             } else {
225 0         0 $cstring =~ s/(\S+)\s*//;
226 0         0 $value = $1;
227             }
228 1 50       8 $args{$key} = $value if $key;
229             }
230 1         9 $self->upstream_host($args{host});
231 1         7 $self->upstream_port($args{port});
232 1         7 $self->upstream_user($args{user});
233 1         7 $self->upstream_password($args{password});
234 1         5 $self->upstream_database($args{dbname});
235 1 50       6 $self->standby_name($args{application_name}) if $args{application_name};
236             }
237 2         39 return $self->connection_string;
238             }
239              
240              
241             =head3 from_recoveryconf($path)
242              
243             Sets all appropriate parameters from a given recovery.conf at a valid path.
244              
245             This weill normalize the connection string in URL format.
246              
247             =cut
248              
249             sub from_recoveryconf {
250 2     2 1 1002 my ($self, $path) = @_;
251 2         58 $self->recoveryconf->fromfile($path);
252 2         522 $self->recoveryconf_path($path);
253 2         51 $self->_set_connection_string($self->recoveryconf->get_value('primary_conninfo'));
254             }
255              
256             =head3 recoveryconf_contents
257              
258             Returns the contents of the recovery.conf to be used.
259              
260             =cut
261              
262             sub recoveryconf_contents {
263 4     4 1 2659 my ($self) = @_;
264 4         97 $self->recoveryconf->set('standby_mode', 1);
265 4 50       78 $self->set_recovery_param('primary_slot_name', $self->standby_name)
266             if $self->standby_name;
267 4         15 $self->connection_string;
268 4         110 return $self->recoveryconf->filecontents . "\n";
269             }
270              
271             =head3 credentials($user, $pass)
272              
273             Sets the username and password.
274              
275             =cut
276              
277             sub credentials {
278 1     1 1 671 my ($self, $user, $pass) = @_;
279 1         5 $self->upstream_user($user);
280 1         4 $self->upstream_password($pass);
281 1         3 $self->connection_string;
282 1         6 return;
283             }
284              
285             =head2 WAL telemetry
286              
287             WAL telemetry works differently on standby than on a master. The standby is not in charge
288             of writes and so there is no "current" wal location. Instead we go by the latest received
289             location.
290              
291             This has a number of important implications. After STONITH, we can quickly poll a set of replicas to see who is most current and redirect traffic there. This is most useful in a server down
292             situation so you can ensure that the most recent replica is failed over to.
293              
294             This can also be used to check WAL telemetry against that on the master to see if there are
295             slow links regarding non-synchronous standby servers and the like.
296              
297             =head3 lag_bytes_from($lsn)
298              
299             Returns the number of bytes passed on the recovery connection between the
300             log series number (lsn) and the current recovery position.
301              
302             =cut
303              
304             sub lag_bytes_from {
305 0     0 1   my ($self, $lsn) = @_;
306 0           my $dbh = $self->connect;
307 0           my $sth = $dbh->prepare("SELECT ?::pg_lsn - pg_last_xlog_receive_location()");
308 0           $sth->execute($lsn);
309 0           return ($sth->fetchrow_array)[0];
310             }
311              
312             =head2 Upstream traversal
313              
314             =head3 upstream()
315              
316             Provides a generic SMO for the immediate upstream server.
317              
318             =head3 master()
319              
320             Traverses upstream until it finds a server which is not recovering and returns a Master SMO for
321             that server.
322              
323             =head2 Promotion
324              
325             Promotion can be done in this case if we can touch a trigger file specified in the recovery.conf
326             or if we can remove the recovery.conf and restart PostgreSQL.
327              
328             =head3 promote($method)
329              
330             Promotes a slave to master. First tries the trigger file if available.
331             Otherwise tries to rename the recovery.conf and restart. Methods tried are:
332              
333             =over
334              
335             =item trigger: write to trigger file
336              
337             =item recoveryconf: delete recovery.conf
338              
339             =back
340              
341             We can only restart if PGObject::Util::Replication::SMO supports
342             restarting the cluster.
343              
344             =cut
345              
346             sub _promote_trigger {
347 0     0     my ($self) = @_;
348 0           local $!;
349 0           my $trigger = $self->recoveryconf->get_value('trigger_file');
350 0 0         return unless $trigger;
351 0           warn $trigger;
352 0 0         open(my $fh, '>', $trigger) or die $!;
353 0           print $fh "\n";
354 0           close $fh;
355 0           return 1;
356             }
357              
358             sub _promote_recoveryconf {
359 0     0     my ($self) = @_;
360 0 0         croak 'No recoveryconf_path set up' unless $self->recoveryconf_path;
361 0           return unlink $self->recoveryconf_path;
362             }
363              
364              
365             sub promote {
366 0     0 1   my ($self) = @_;
367 0 0         return $self->_promote_trigger
368             if $self->recoveryconf->get_value('trigger_file');
369 0           my $retval = $self->_promote_recoveryconf();
370 0 0         $self->restart if $self->can('restart'); # if supported by SMO
371 0           return $retval;
372             }
373              
374             =head1 AUTHOR
375              
376             Chris Travers, C<< >>
377              
378             =head1 BUGS
379              
380             Please report any bugs or feature requests to C, or through
381             the web interface at L. I will be notified, and then you'll
382             automatically be notified of progress on your bug as I make changes.
383              
384              
385              
386              
387             =head1 SUPPORT
388              
389             You can find documentation for this module with the perldoc command.
390              
391             perldoc PGObject::Util::Replication::Standby
392              
393              
394             You can also look for information at:
395              
396             =over 4
397              
398             =item * RT: CPAN's request tracker (report bugs here)
399              
400             L
401              
402             =item * AnnoCPAN: Annotated CPAN documentation
403              
404             L
405              
406             =item * CPAN Ratings
407              
408             L
409              
410             =item * Search CPAN
411              
412             L
413              
414             =back
415              
416              
417             =head1 ACKNOWLEDGEMENTS
418              
419              
420             =head1 LICENSE AND COPYRIGHT
421              
422             Copyright 2017 Adjust.com
423              
424             This program is distributed under the (Revised) BSD License:
425             L
426              
427             Redistribution and use in source and binary forms, with or without
428             modification, are permitted provided that the following conditions
429             are met:
430              
431             * Redistributions of source code must retain the above copyright
432             notice, this list of conditions and the following disclaimer.
433              
434             * Redistributions in binary form must reproduce the above copyright
435             notice, this list of conditions and the following disclaimer in the
436             documentation and/or other materials provided with the distribution.
437              
438             * Neither the name of Adjust.com
439             nor the names of its contributors may be used to endorse or promote
440             products derived from this software without specific prior written
441             permission.
442              
443             THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
444             "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
445             LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
446             A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
447             OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
448             SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
449             LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
450             DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
451             THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
452             (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
453             OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
454              
455              
456             =cut
457              
458             1; # End of PGObject::Util::Replication::Standby