File Coverage

blib/lib/Protocol/PostgreSQL.pm
Criterion Covered Total %
statement 154 206 74.7
branch 40 88 45.4
condition 0 5 0.0
subroutine 31 39 79.4
pod 25 25 100.0
total 250 363 68.8


line stmt bran cond sub pod time code
1             package Protocol::PostgreSQL;
2             # ABSTRACT: PostgreSQL wire protocol
3 5     5   51755 use strict;
  5         10  
  5         159  
4 5     5   26 use warnings;
  5         13  
  5         136  
5 5     5   837 use parent qw(Mixin::Event::Dispatch);
  5         358  
  5         35  
6              
7             our $VERSION = '0.008';
8              
9             =head1 NAME
10              
11             Protocol::PostgreSQL - support for the PostgreSQL wire protocol
12              
13             =head1 VERSION
14              
15             version 0.008
16              
17             =head1 SYNOPSIS
18              
19             use strict; use warnings;
20             package PostgreSQL::Client;
21             use parent q{Protocol::PostgreSQL::Client};
22              
23             sub new { my $self = shift->SUPER::new(@_); $self->{socket} = $self->connect(...); $self }
24             sub on_send_request { shift->socket->send(@_) }
25             sub socket { shift->{socket} }
26              
27             sub connect { ... } # provide a method to connect to the server
28             sub incoming { shift->socket->read(@_) } # provide a method which passes on data from server
29              
30             package main;
31             my $client = PostgreSQL::Client->new(user => ..., server => ..., database => ...);
32             $client->simple_query(sql => q{select * from table}, on_data_row => sub {
33             my ($client, %args) = @_;
34             my @cols = $args{row};
35             print join(',', @cols) . "\n";
36             });
37              
38             =head1 DESCRIPTION
39              
40             Provides protocol-level support for PostgreSQL 7.4+, as defined in L.
41              
42             =head2 CALLBACKS
43              
44             The following callbacks can be provided either as parameters to L or as methods in subclasses:
45              
46             =over 4
47              
48             =item * on_send_request - Called each time there is a new message to be sent to the other side of the connection.
49              
50             =item * on_authenticated - Called when authentication is complete
51              
52             =item * on_copy_data - we have received data from an ongoing COPY request
53              
54             =item * on_copy_complete - the active COPY request has completed
55              
56             =back
57              
58             For the client, the following additional callbacks are available:
59              
60             =over 4
61              
62             =item * on_request_ready - the server is ready for the next request
63              
64             =item * on_bind_complete - a Bind request has completed
65              
66             =item * on_close_complete - the Close request has completed
67              
68             =item * on_command_complete - the requested command has finished, this will typically be followed by an on_request_ready event
69              
70             =item * on_copy_in_response - indicates that the server is ready to receive COPY data
71              
72             =item * on_copy_out_response - indicates that the server is ready to send COPY data
73              
74             =item * on_copy_both_response - indicates that the server is ready to exchange COPY data (for replication)
75              
76             =item * on_data_row - data from the current query
77              
78             =item * on_empty_query - special-case response when sent an empty query, can be used for 'ping'. Typically followed by on_request_ready
79              
80             =item * on_error - server has raised an error
81              
82             =item * on_function_call_result - results from a function call
83              
84             =item * on_no_data - indicate that a query returned no data, typically followed by on_request_ready
85              
86             =item * on_notice - server has sent us a notice
87              
88             =item * on_notification - server has sent us a NOTIFY
89              
90             =item * on_parameter_description - parameters are being described
91              
92             =item * on_parameter_status - parameter status...
93              
94             =item * on_parse_complete - parsing is done
95              
96             =item * on_portal_suspended - the portal has been suspended, probably hit the row limit
97              
98             =item * on_ready_for_query - we're ready for queries
99              
100             =item * on_row_description - descriptive information about the rows we're likely to be seeing shortly
101              
102             =back
103              
104             And the server can send these events:
105              
106             =over 4
107              
108             =item * on_copy_fail - the frontend is indicating that the copy has failed
109              
110             =item * on_describe - request for something to be described
111              
112             =item * on_execute - request execution of a given portal
113              
114             =item * on_flush - request flush
115              
116             =item * on_function_call - request execution of a given function
117              
118             =item * on_parse - request to parse something
119              
120             =item * on_password - password information
121              
122             =item * on_query - simple query request
123              
124             =item * on_ssl_request - we have an SSL request
125              
126             =item * on_startup_message - we have an SSL request
127              
128             =item * on_sync - sync request
129              
130             =item * on_terminate - termination request
131              
132             =back
133              
134             =cut
135              
136 5     5   40880 use Digest::MD5 ();
  5         19  
  5         75  
137 5     5   5659 use Time::HiRes ();
  5         11341  
  5         191  
138 5     5   5382 use POSIX qw{strftime};
  5         48405  
  5         97  
139 5     5   11000 use Protocol::PostgreSQL::RowDescription;
  5         16  
  5         284  
140 5     5   3319 use Protocol::PostgreSQL::Statement;
  5         16  
  5         249  
141              
142             # Currently v3.0, which is used in PostgreSQL 7.4+
143 5     5   47 use constant PROTOCOL_VERSION => 0x00030000;
  5         10  
  5         76754  
144              
145             # Currently-allowed list of callbacks (can be set via ->configure)
146             our @CALLBACKS_ALLOWED = qw(
147             on_send_request
148             on_authenticated
149             on_copy_data
150             on_copy_complete
151             on_request_ready
152             on_bind_complete
153             on_close_complete
154             on_command_complete
155             on_copy_in_response
156             on_copy_out_response
157             on_copy_both_response
158             on_data_row
159             on_empty_query
160             on_error
161             on_function_call_result
162             on_no_data
163             on_notice
164             on_notification
165             on_parameter_description
166             on_parameter_status
167             on_parse_complete
168             on_portal_suspended
169             on_ready_for_query
170             on_row_description
171             on_copy_fail
172             on_describe
173             on_execute
174             on_flush
175             on_function_call
176             on_parse
177             on_password
178             on_query
179             on_ssl_request
180             on_startup_message
181             on_sync
182             on_terminate
183             );
184             # Hash form for convenience
185             my %CALLBACK_MAP = map { $_ => 1 } @CALLBACKS_ALLOWED;
186              
187             # Types of authentication response
188             my %AUTH_TYPE = (
189             0 => 'AuthenticationOk',
190             2 => 'AuthenticationKerberosV5',
191             3 => 'AuthenticationCleartextPassword',
192             5 => 'AuthenticationMD5Password',
193             6 => 'AuthenticationSCMCredential',
194             7 => 'AuthenticationGSS',
195             9 => 'AuthenticationSSPI',
196             8 => 'AuthenticationGSSContinue',
197             );
198              
199             # Transaction states the backend can be in
200             my %BACKEND_STATE = (
201             I => 'idle',
202             T => 'transaction',
203             E => 'error'
204             );
205              
206             # used for error and notice responses
207             my %NOTICE_CODE = (
208             S => 'severity',
209             C => 'code',
210             M => 'message',
211             D => 'detail',
212             H => 'hint',
213             P => 'position',
214             p => 'internal_position',
215             q => 'internal_query',
216             W => 'where',
217             F => 'file',
218             L => 'line',
219             R => 'routine'
220             );
221              
222             # Mapping from name to backend message code (single byte)
223             our %MESSAGE_TYPE_BACKEND = (
224             AuthenticationRequest => 'R',
225             BackendKeyData => 'K',
226             BindComplete => '2',
227             CloseComplete => '3',
228             CommandComplete => 'C',
229             CopyData => 'd',
230             CopyDone => 'c',
231             CopyInResponse => 'G',
232             CopyOutResponse => 'H',
233             CopyBothResponse => 'W',
234             DataRow => 'D',
235             EmptyQueryResponse => 'I',
236             ErrorResponse => 'E',
237             FunctionCallResponse => 'V',
238             NoData => 'n',
239             NoticeResponse => 'N',
240             NotificationResponse => 'A',
241             ParameterDescription => 't',
242             ParameterStatus => 'S',
243             ParseComplete => '1',
244             PortalSuspended => 's',
245             ReadyForQuery => 'Z',
246             RowDescription => 'T',
247             );
248             our %BACKEND_MESSAGE_CODE = reverse %MESSAGE_TYPE_BACKEND;
249              
250             # Mapping from name to frontend message code (single byte)
251             our %MESSAGE_TYPE_FRONTEND = (
252             Bind => 'B',
253             Close => 'C',
254             CopyData => 'd',
255             CopyDone => 'c',
256             CopyFail => 'f',
257             Describe => 'D',
258             Execute => 'E',
259             Flush => 'H',
260             FunctionCall => 'F',
261             Parse => 'P',
262             PasswordMessage => 'p',
263             Query => 'Q',
264             # Both of these are handled separately
265             # SSLRequest => '',
266             # StartupMessage => '',
267             Sync => 'S',
268             Terminate => 'X',
269             );
270             our %FRONTEND_MESSAGE_CODE = reverse %MESSAGE_TYPE_FRONTEND;
271              
272             # Defined message handlers for outgoing frontend messages
273             our %FRONTEND_MESSAGE_BUILDER = (
274             # Bind parameters to an existing prepared statement
275             Bind => sub {
276             my $self = shift;
277             my %args = @_;
278              
279             $args{param} ||= [];
280             my $param = '';
281             my $count = scalar @{$args{param}};
282             foreach my $p (@{$args{param}}) {
283             if(!defined $p) {
284             $param .= pack('N1', 0xFFFFFFFF);
285             } else {
286             $param .= pack('N1a*', length($p), $p);
287             }
288             }
289             my $msg = pack('Z*Z*n1n1a*n1',
290             defined($args{portal}) ? $args{portal} : '',
291             defined($args{statement}) ? $args{statement} : '',
292             0, # Parameter types
293             $count, # Number of bound parameters
294             $param, # Actual parameter values
295             0 # Number of result column format definitions (0=use default text format)
296             );
297             push @{$self->{pending_bind}}, $args{sth} if $args{sth};
298             $self->debug(sub {
299             join('',
300             "Bind",
301             defined($args{portal}) ? " for portal [" . $args{portal} . "]" : '',
302             defined($args{statement}) ? " for statement [" . $args{statement} . "]" : '',
303             " with $count parameter(s): ",
304             join(',', @{$args{param}})
305             )
306             });
307             return $self->_build_message(
308             type => 'Bind',
309             data => $msg,
310             );
311             },
312             CopyData => sub {
313             my $self = shift;
314             my %args = @_;
315             return $self->_build_message(
316             type => 'CopyData',
317             data => pack('a*', $args{data})
318             );
319             },
320             Close => sub {
321             my $self = shift;
322             my %args = @_;
323              
324             my $msg = pack('a1Z*',
325             exists $args{portal} ? 'P' : 'S', # close a portal or a statement
326             defined($args{statement})
327             ? $args{statement}
328             : (defined($args{portal})
329             ? $args{portal}
330             : ''
331             )
332             );
333             push @{$self->{pending_close}}, $args{on_complete} if $args{on_complete};
334             return $self->_build_message(
335             type => 'Close',
336             data => $msg,
337             );
338             },
339             CopyDone => sub {
340             my $self = shift;
341             return $self->_build_message(
342             type => 'CopyDone',
343             data => '',
344             );
345             },
346             # Describe expected SQL results
347             Describe => sub {
348             my $self = shift;
349             my %args = @_;
350              
351             my $msg = pack('a1Z*', exists $args{portal} ? 'P' : 'S', defined($args{statement}) ? $args{statement} : (defined($args{portal}) ? $args{portal} : ''));
352             push @{$self->{pending_describe}}, $args{sth} if $args{sth};
353             return $self->_build_message(
354             type => 'Describe',
355             data => $msg,
356             ) . $self->_build_message(
357             type => 'Query',
358             data => "\0"
359             );
360             },
361             # Execute either a named or anonymous portal (prepared statement with bind vars)
362             Execute => sub {
363             my $self = shift;
364             my %args = @_;
365              
366             my $msg = pack('Z*N1', defined($args{portal}) ? $args{portal} : '', $args{limit} || 0);
367             push @{$self->{pending_execute}}, $args{sth} if $args{sth};
368             $self->debug("Executing " . (defined($args{portal}) ? "portal " . $args{portal} : "default portal") . ($args{limit} ? " with limit " . $args{limit} : " with no limit"));
369             return $self->_build_message(
370             type => 'Execute',
371             data => $msg,
372             );
373             },
374             # Parse SQL for a prepared statement
375             Parse => sub {
376             my $self = shift;
377             my %args = @_;
378             die "No SQL provided" unless defined $args{sql};
379              
380             my $msg = pack('Z*Z*n1', (defined($args{statement}) ? $args{statement} : ''), $args{sql}, 0);
381             return $self->_build_message(
382             type => 'Parse',
383             data => $msg,
384             );
385             },
386             # Password data, possibly encrypted depending on what the server specified
387             PasswordMessage => sub {
388             my $self = shift;
389             my %args = @_;
390              
391             my $pass = $args{password};
392             if($self->{password_type} eq 'md5') {
393             # md5hex of password . username,
394             # then md5hex result with salt appended
395             # then stick 'md5' at the front.
396             $pass = 'md5' . Digest::MD5::md5_hex(
397             Digest::MD5::md5_hex($pass . $self->{user})
398             . $self->{password_salt}
399             );
400             }
401              
402             # Yes, protocol requires zero-terminated string format even
403             # if we have a binary password value.
404             return $self->_build_message(
405             type => 'PasswordMessage',
406             data => pack('Z*', $pass)
407             );
408             },
409             # Simple query
410             Query => sub {
411             my $self = shift;
412             my %args = @_;
413             return $self->_build_message(
414             type => 'Query',
415             data => pack('Z*', $args{sql})
416             );
417             },
418             # Initial mesage informing the server which database and user we want
419             StartupMessage => sub {
420             my $self = shift;
421             die "Not first message" unless $self->is_first_message;
422              
423             my %args = @_;
424             my $parameters = join('', map { pack('Z*', $_) } map { $_, $args{$_} } grep { exists $args{$_} } qw(user database options));
425             $parameters .= "\0";
426              
427             return $self->_build_message(
428             type => undef,
429             data => pack('N*', PROTOCOL_VERSION) . $parameters
430             );
431             },
432             # Synchonise after a prepared statement has finished execution.
433             Sync => sub {
434             my $self = shift;
435             return $self->_build_message(
436             type => 'Sync',
437             data => '',
438             );
439             },
440             Terminate => sub {
441             my $self = shift;
442             return $self->_build_message(
443             type => 'Terminate',
444             data => '',
445             );
446             },
447             );
448              
449             # Handlers for specification authentication messages from backend.
450             my %AUTH_HANDLER = (
451             AuthenticationOk => sub {
452             my ($self, $msg) = @_;
453             $self->invoke_event('authenticated');
454             $self->invoke_event('request_ready');
455             return $self;
456             },
457             AuthenticationKerberosV5 => sub {
458             my ($self, $msg) = @_;
459             die "Not yet implemented";
460             },
461             AuthenticationCleartextPassword => sub {
462             my ($self, $msg) = @_;
463             $self->{password_type} = 'plain';
464             $self->invoke_event('password');
465             return $self;
466             },
467             AuthenticationMD5Password => sub {
468             my ($self, $msg) = @_;
469             (undef, undef, undef, my $salt) = unpack('C1N1N1a4', $msg);
470             $self->{password_type} = 'md5';
471             $self->{password_salt} = $salt;
472             $self->invoke_event('password');
473             return $self;
474             },
475             AuthenticationSCMCredential => sub {
476             my ($self, $msg) = @_;
477             die "Not yet implemented";
478             return $self;
479             },
480             AuthenticationGSS => sub {
481             my ($self, $msg) = @_;
482             die "Not yet implemented";
483             },
484             AuthenticationSSPI => sub {
485             my ($self, $msg) = @_;
486             die "Not yet implemented";
487             },
488             AuthenticationGSSContinue => sub {
489             my ($self, $msg) = @_;
490             die "Not yet implemented";
491             }
492             );
493              
494             # Defined message handlers for incoming messages from backend
495             our %BACKEND_MESSAGE_HANDLER = (
496             # We had some form of authentication request or response, pass it over to an auth handler to deal with it further.
497             AuthenticationRequest => sub {
498             my $self = shift;
499             my $msg = shift;
500              
501             my (undef, undef, $auth_code) = unpack('C1N1N1', $msg);
502             my $auth_type = $AUTH_TYPE{$auth_code} or die "Invalid auth code $auth_code received";
503             $self->debug("Auth message [$auth_type]");
504             return $AUTH_HANDLER{$auth_type}->($self, $msg);
505             },
506             # Key data for cancellation requests
507             BackendKeyData => sub {
508             my $self = shift;
509             my $msg = shift;
510             (undef, my $size, my $pid, my $key) = unpack('C1N1N1N1', $msg);
511             $self->invoke_event('backendkeydata',
512             pid => $pid,
513             key => $key
514             );
515             return $self;
516             },
517             # A bind operation has completed
518             BindComplete => sub {
519             my $self = shift;
520             my $msg = shift;
521             (undef, my $size) = unpack('C1N1', $msg);
522             if(my $sth = shift(@{$self->{pending_bind}})) {
523             $self->debug("Pass over to statement $sth");
524             $sth->bind_complete;
525             }
526             $self->invoke_event('bind_complete');
527             return $self;
528             },
529             # We have closed the connection to the server successfully
530             CloseComplete => sub {
531             my $self = shift;
532             my $msg = shift;
533             (undef, my $size) = unpack('C1N1', $msg);
534              
535             # Handler could be undef - we always push something to keep things symmetrical
536             if(my $handler = shift @{$self->{pending_close}}) {
537             $handler->($self);
538             }
539             $self->invoke_event('close_complete');
540             return $self;
541             },
542             # A command has completed, we should see a ready response immediately after this
543             CommandComplete => sub {
544             my $self = shift;
545             my $msg = shift;
546             my (undef, undef, $result) = unpack('C1N1Z*', $msg);
547             if(@{$self->{pending_execute}}) {
548             my $last = shift @{$self->{pending_execute}};
549             $self->debug("Finished command for $last");
550             $last->command_complete if $last;
551             }
552             $self->invoke_event('command_complete', result => $result);
553             return $self;
554             },
555             # We have a COPY response from the server indicating that it's ready to accept COPY data
556             CopyInResponse => sub {
557             my $self = shift;
558             my $msg = shift;
559             (undef, undef, my $type, my $count) = unpack('C1N1C1n1', $msg);
560             substr $msg, 0, 8, '';
561             my @formats;
562             for (1..$count) {
563             push @formats, unpack('n1', $msg);
564             substr $msg, 0, 2, '';
565             }
566             $self->invoke_event('copy_in_response', count => $count, columns => \@formats);
567             return $self;
568             },
569             # The basic SQL result - a single row of data
570             DataRow => sub {
571             my $self = shift;
572             my $msg = shift;
573             my (undef, undef, $count) = unpack('C1N1n1', $msg);
574             substr $msg, 0, 7, '';
575             my @fields;
576             # TODO Tidy this up
577             my $sth = @{$self->{pending_execute}} ? $self->{pending_execute}[0] : $self->active_statement;
578             my $desc = $sth ? $sth->row_description : $self->row_description;
579             foreach my $idx (0..$count-1) {
580             my $field = $desc->field_index($idx);
581             my ($size) = unpack('N1', $msg);
582             substr $msg, 0, 4, '';
583             my $data;
584             my $null = ($size == 0xFFFFFFFF);
585             unless($null) {
586             $data = $field->parse_data($msg, $size);
587             substr $msg, 0, $size, '';
588             }
589             push @fields, {
590             null => $null,
591             description => $field,
592             data => $null ? undef : $data,
593             }
594             }
595             $sth->data_row(\@fields) if $sth;
596             $self->invoke_event('data_row', row => \@fields);
597             return $self;
598             },
599             # Response given when empty query (whitespace only) is provided
600             EmptyQueryResponse => sub {
601             my $self = shift;
602             my $msg = shift;
603             if(@{$self->{pending_execute}}) {
604             my $last = shift @{$self->{pending_execute}};
605             $self->debug("Finished command for $last");
606             }
607             $self->invoke_event('empty_query');
608             return $self;
609             },
610             # An error occurred, can indicate that connection is about to close or just be a warning
611             ErrorResponse => sub {
612             my $self = shift;
613             my $msg = shift;
614             (undef, my $size) = unpack('C1N1', $msg);
615             substr $msg, 0, 5, '';
616             my %notice;
617             FIELD:
618             while(length($msg)) {
619             my ($code, $str) = unpack('A1Z*', $msg);
620             last FIELD unless $code && $code ne "\0";
621              
622             die "Unknown NOTICE code [$code]" unless exists $NOTICE_CODE{$code};
623             $notice{$NOTICE_CODE{$code}} = $str;
624             substr $msg, 0, 2+length($str), '';
625             }
626             if(@{$self->{pending_execute}}) {
627             my $last = shift @{$self->{pending_execute}};
628             $self->debug("Error for $last");
629             }
630             $self->invoke_event('error', error => \%notice);
631             return $self;
632             },
633             # Result from calling a function
634             FunctionCallResponse => sub {
635             my $self = shift;
636             my $msg = shift;
637             (undef, my $size, my $len) = unpack('C1N1N1', $msg);
638             substr $msg, 0, 9, '';
639             my $data = ($len == 0xFFFFFFFF) ? undef : substr $msg, 0, $len;
640             $self->invoke_event('function_call_response', data => $data);
641             return $self;
642             },
643             # No data follows
644             NoData => sub {
645             my $self = shift;
646             my $msg = shift;
647             (undef, my $size) = unpack('C1N1', $msg);
648             $self->invoke_event('no_data');
649             return $self;
650             },
651             # We have a notice, which is like an error but can be just informational
652             NoticeResponse => sub {
653             my $self = shift;
654             my $msg = shift;
655             (undef, my $size) = unpack('C1N1', $msg);
656             substr $msg, 0, 5, '';
657             my %notice;
658             FIELD:
659             while(length($msg)) {
660             my ($code, $str) = unpack('A1Z*', $msg);
661             last FIELD unless $code && $code ne "\0";
662              
663             die "Unknown NOTICE code [$code]" unless exists $NOTICE_CODE{$code};
664             $notice{$NOTICE_CODE{$code}} = $str;
665             substr $msg, 0, 2+length($str), '';
666             }
667             $self->invoke_event('notice', notice => \%notice);
668             return $self;
669             },
670             # LISTEN/NOTIFY mechanism
671             NotificationReponse => sub {
672             my $self = shift;
673             my $msg = shift;
674             (undef, my $size, my $pid, my $channel, my $data) = unpack('C1N1N1Z*Z*', $msg);
675             $self->invoke_event('notification', pid => $pid, channel => $channel, data => $data);
676             return $self;
677             },
678             # Connection parameter information
679             ParameterStatus => sub {
680             my $self = shift;
681             my $msg = shift;
682             (undef, my $size) = unpack('C1N1', $msg);
683             substr $msg, 0, 5, '';
684             my %status;
685             # Series of key,value pairs
686             PARAMETER:
687             while(1) {
688             my ($k, $v) = unpack('Z*Z*', $msg);
689             last PARAMETER unless defined($k) && length($k);
690             $status{$k} = $v;
691             substr $msg, 0, length($k) + length($v) + 2, '';
692             }
693             $self->invoke_event('parameter_status', status => \%status);
694             return $self;
695             },
696             # Description of the format that subsequent parameters are using, typically plaintext only
697             ParameterDescription => sub {
698             my $self = shift;
699             my $msg = shift;
700             (undef, my $size, my $count) = unpack('C1N1n1', $msg);
701             substr $msg, 0, 7, '';
702             my @oid_list;
703             for my $idx (1..$count) {
704             my ($oid) = unpack('N1', $msg);
705             substr $msg, 0, 4, '';
706             push @oid_list, $oid;
707             }
708             $self->invoke_event('parameter_description', parameters => \@oid_list);
709             return $self;
710             },
711             # Parse request succeeded
712             ParseComplete => sub {
713             my $self = shift;
714             my $msg = shift;
715             (undef, my $size) = unpack('C1N1', $msg);
716             $self->active_statement->parse_complete if $self->active_statement;
717             $self->invoke_event('parse_complete');
718             return $self;
719             },
720             # Portal has sent enough data to meet the row limit, should be requested again if more is required
721             PortalSuspended => sub {
722             my $self = shift;
723             my $msg = shift;
724             (undef, my $size) = unpack('C1N1', $msg);
725             if(@{$self->{pending_execute}}) {
726             my $last = shift @{$self->{pending_execute}};
727             $self->debug("Suspended portal for $last");
728             }
729             $self->invoke_event('portal_suspended');
730             return $self;
731             },
732             # All ready to accept queries
733             ReadyForQuery => sub {
734             my $self = shift;
735             my $msg = shift;
736             my (undef, undef, $state) = unpack('C1N1A1', $msg);
737             $self->debug("Backend state is $state");
738             $self->backend_state($BACKEND_STATE{$state});
739             $self->is_ready(1);
740             return $self->send_next_in_queue if $self->has_queued;
741             $self->invoke_event('ready_for_query');
742             return $self;
743             },
744             # Information on the row data that's expected to follow
745             RowDescription => sub {
746             my $self = shift;
747             my $msg = shift;
748             my (undef, undef, $count) = unpack('C1N1n1', $msg);
749             my $row = Protocol::PostgreSQL::RowDescription->new;
750             substr $msg, 0, 7, '';
751             foreach my $id (0..$count-1) {
752             my ($name, $table_id, $field_id, $data_type, $data_size, $type_modifier, $format_code) = unpack('Z*N1n1N1n1N1n1', $msg);
753             my %data = (
754             name => $name,
755             table_id => $table_id,
756             field_id => $field_id,
757             data_type => $data_type,
758             data_size => $data_size,
759             type_modifier => $type_modifier,
760             format_code => $format_code
761             );
762             $self->debug($_ . ' => ' . $data{$_}) for sort keys %data;
763             my $field = Protocol::PostgreSQL::FieldDescription->new(%data);
764             $row->add_field($field);
765             substr $msg, 0, 19 + length($name), '';
766             }
767             $self->row_description($row);
768             if(my $last = shift @{$self->{pending_describe}}) {
769             $last->row_description($row);
770             }
771             $self->invoke_event('row_description', description => $row);
772             return $self;
773             },
774             );
775              
776             =head1 METHODS
777              
778             =cut
779              
780             =head2 new
781              
782             Instantiate a new object. Blesses an empty hashref and calls L, subclasses can bypass this entirely
783             and just call L directly after instantiation.
784              
785             =cut
786              
787             sub new {
788 4     4 1 13582 my $self = bless {
789             }, shift;
790 4         35 $self->configure(@_);
791 4         712 return $self;
792             }
793              
794             =head2 configure
795              
796             Does the real preparation for the object.
797              
798             Takes callbacks as named parameters, including:
799              
800             =over 4
801              
802             =item * on_error
803              
804             =item * on_data_row
805              
806             =item * on_ready_for_query
807              
808             =back
809              
810             =cut
811              
812             sub configure {
813 4     4 1 10 my $self = shift;
814 4         19 my %args = @_;
815              
816             # Init parameters - should only be needed on first call
817 4         92 $self->{$_} = [] for grep !exists $self->{$_}, qw(pending_execute pending_describe message_queue);
818 4         29 $self->{$_} = 0 for grep !exists $self->{$_}, qw(authenticated message_count);
819 4 50       31 $self->{wait_for_startup} = 1 unless exists $self->{wait_for_startup};
820              
821 4 100       32 $self->{debug} = delete $args{debug} if exists $args{debug};
822              
823 4 50       28 $self->{user} = delete $args{user} if exists $args{user};
824 4 50       20 $self->{pass} = delete $args{pass} if exists $args{pass};
825 4 50       24 $self->{database} = delete $args{database} if exists $args{database};
826              
827             # Callbacks
828 4         406 foreach my $k (grep /on_(.+)$/, keys %args) {
829 4         25 my ($event) = $k =~ /on_(.+)$/;
830 4 50       25 die "Unknown callback '$k'" unless exists $CALLBACK_MAP{$k};
831 4         62 $self->add_handler_for_event($event => delete $args{$k});
832             }
833 4         158 return %args;
834             }
835              
836             =head2 has_queued
837              
838             Returns number of queued messages.
839              
840             =cut
841              
842 5     5 1 9 sub has_queued { scalar(@{$_[0]->{message_queue}}) }
  5         32  
843              
844             =head2 is_authenticated
845              
846             Returns true if we are authenticated (and can start sending real data).
847              
848             =cut
849              
850 1 50   1 1 4095 sub is_authenticated { shift->{authenticated} ? 1 : 0 }
851              
852             =head2 is_first_message
853              
854             Returns true if this is the first message, as per L:
855              
856             "For historical reasons, the very first message sent by the client (the startup message)
857             has no initial message-type byte."
858              
859             =cut
860              
861 16     16 1 140 sub is_first_message { shift->{message_count} < 1 }
862              
863             =head2 initial_request
864              
865             Generate and send the startup request.
866              
867             =cut
868              
869             sub initial_request {
870 2     2 1 4595 my $self = shift;
871 2         10 my %args = @_;
872 2 100       8 my %param = map { $_ => exists $args{$_} ? delete $args{$_} : $self->{$_} } qw(database user);
  4         43  
873 2         9 delete @param{grep { !defined($param{$_}) } keys %param};
  4         13  
874 2 50       21 die "don't know how to handle " . join(',', keys %args) if keys %args;
875              
876 2         22 $self->send_message('StartupMessage', %param);
877 2         5 $self->{wait_for_startup} = 0;
878 2         12 return $self;
879             }
880              
881             =head2 send_message
882              
883             Send a message.
884              
885             =cut
886              
887             sub send_message {
888 4     4 1 1356 my $self = shift;
889              
890             # Clear the ready-to-send flag since we're about to throw a message over to the
891             # server and we don't want any others getting in the way.
892 4         11 $self->{is_ready} = 0;
893              
894 4         24 my $msg = $self->message(@_);
895 4 50       12 die "Empty message?" unless defined $msg;
896              
897             # Use the coderef form of the debug call since the packet breakdown is a slow operation.
898             $self->debug(sub {
899 0 0       0 "send data: [" .
900             join(" ", map sprintf("%02x", ord($_)), split //, $msg) . "], " .
901             (($self->is_first_message ? "startup packet" : $FRONTEND_MESSAGE_CODE{substr($msg, 0, 1)}) || 'unknown message') . " (" .
902 0   0 0   0 join('', '', map { (my $txt = defined($_) ? $_ : '') =~ tr/ []"'!#$%*&=:;A-Za-z0-9,()_ -/./c; $txt } split //, $msg) . ")"
  0         0  
903 4         61 });
904 4         252 $self->invoke_event('send_request', $msg);
905 4         256 return $self;
906             }
907              
908             =head2 queue
909              
910             Queue up a message for sending. The message will only be sent when we're in ReadyForQuery
911             mode, which could be immediately or later.
912              
913             =cut
914              
915             sub queue {
916 5     5 1 12 my $self = shift;
917 5         16 my %args = @_;
918              
919             # Get raw message data to send, could be passed as a ready-built message packet or a set of parameters.
920 5         16 my $msg = delete $args{message};
921 5 100       19 unless($msg) {
922             # Might get a message with no parameters
923 1 50       3 $msg = $self->message(delete $args{type}, @{ delete $args{parameters} || [] });
  1         10  
924             }
925              
926             # Add this to the queue
927 5         12 push @{$self->{message_queue}}, {
  5         26  
928             message => $msg,
929             %args
930             };
931              
932             # Send immediately if we're in a ready state
933 5 50       19 $self->send_next_in_queue if $self->is_ready;
934 5         11 return $self;
935             }
936              
937             =head2 send_next_in_queue
938              
939             Send the next queued message.
940              
941             =cut
942              
943             sub send_next_in_queue {
944 5     5 1 8 my $self = shift;
945              
946             # TODO Clean up the duplication between this method and L.
947 5 50       7 if(my $info = shift @{$self->{message_queue}}) {
  5         23  
948 5         14 my $msg = delete $info->{message};
949              
950             # Clear flag so we only send a single message rather than hammering the server with everything in the queue
951 5         12 $self->{is_ready} = 0;
952             $self->debug(sub {
953 0     0   0 "send data: [" . join(" ", map sprintf("%02x", ord($_)), split //, $msg) . "], " . $FRONTEND_MESSAGE_CODE{substr($msg, 0, 1)} . " (" . join("", grep { /^([a-z0-9,()_ -])$/ } split //, $msg) . ")"
  0         0  
954 5         56 });
955 5         54 $self->invoke_event('send_request', $msg);
956              
957             # Ping the callback to let it know message is now in flight
958 5 100       271 $info->{callback}->($self, $info) if exists $info->{callback};
959             }
960 5         11 return $self;
961             }
962              
963             =head2 message
964              
965             Creates a new message of the given type.
966              
967             =cut
968              
969             sub message {
970 10     10 1 403 my $self = shift;
971 10         24 my $type = shift;
972 10 50       69 die "Message $type unknown" unless exists $FRONTEND_MESSAGE_BUILDER{$type};
973              
974 10         45 my $msg = $FRONTEND_MESSAGE_BUILDER{$type}->($self, @_);
975 10         23 ++$self->{message_count};
976 10         48 return $msg;
977             }
978              
979             =head2 attach_event
980              
981             Attach new handler(s) to the given event(s).
982              
983             =cut
984              
985             sub attach_event {
986 0     0 1 0 my $self = shift;
987 0         0 my %args = @_;
988 0         0 $self->debug("Using old ->attach_event interface, suggest ->add_handler_for_event from Mixin::Event::Dispatch instead for %s", join(',', keys %args));
989             $self->add_handler_for_event(
990 0     0   0 map { $_ => sub { $args{$_}->(@_); 1 } } keys %args
  0         0  
  0         0  
  0         0  
991             );
992 0         0 for (keys %args) {
993 0         0 my $k = "on_$_";
994 0 0       0 die "Unknown callback '$_'" unless exists $CALLBACK_MAP{$k};
995 0         0 $self->{_callback}->{$k} = $args{$_};
996             }
997 0         0 return $self;
998             }
999              
1000             =head2 detach_event
1001              
1002             Detach handler(s) from the given event(s). Not implemented.
1003              
1004             =cut
1005              
1006             sub detach_event {
1007 0     0 1 0 my $self = shift;
1008 0         0 warn "detach_event not implemented, see ->add_handler_for_event in Mixin::Event::Dispatch";
1009 0         0 return $self;
1010             }
1011              
1012             =head2 debug
1013              
1014             Helper method to report debug information. Can take a string or a coderef.
1015              
1016             =cut
1017              
1018             sub debug {
1019 81     81 1 114 my $self = shift;
1020 81 50       587 return unless $self->{debug};
1021              
1022 0         0 my $msg = shift(@_);
1023 0 0 0     0 $msg = $msg->() if ref $msg && ref $msg eq 'CODE';
1024 0 0       0 if(!ref $self->{debug}) {
1025 0         0 my $now = Time::HiRes::time;
1026 0         0 warn strftime("%Y-%m-%d %H:%M:%S", gmtime($now)) . sprintf(".%03d", int($now * 1000.0) % 1000.0) . " $msg\n";
1027 0         0 return;
1028             }
1029 0 0       0 if(ref $self->{debug} eq 'CODE') {
1030 0         0 $self->{debug}->($msg, @_);
1031 0         0 return;
1032             }
1033 0         0 die "Unknown debug setting " . $self->{debug};
1034             }
1035              
1036             =head2 handle_message
1037              
1038             Handle an incoming message from the server.
1039              
1040             =cut
1041              
1042             sub handle_message {
1043 18     18 1 6134 my $self = shift;
1044 18         36 my $msg = shift;
1045             $self->debug(sub {
1046 0     0   0 "recv data: [" . join(" ", map sprintf("%02x", ord($_)), split //, $msg) . "], " . $BACKEND_MESSAGE_CODE{substr($msg, 0, 1)}
1047 18         131 });
1048              
1049             # Extract code and identify which message handler to use
1050 18         454 my $code = substr $msg, 0, 1;
1051 18         62 my $type = $BACKEND_MESSAGE_CODE{$code};
1052 18         71 $self->debug("Handle [$type] message");
1053 18 50       66 die "No handler for $type" unless exists $BACKEND_MESSAGE_HANDLER{$type};
1054              
1055             # Clear the ready-to-send flag until we've processed this
1056 18         42 $self->{is_ready} = 0;
1057 18         64 return $BACKEND_MESSAGE_HANDLER{$type}->($self, $msg);
1058             }
1059              
1060             =head2 message_length
1061              
1062             Returns the length of the given message.
1063              
1064             =cut
1065              
1066             sub message_length {
1067 17     17 1 20553 my $self = shift;
1068 17         25 my $msg = shift;
1069 17 50       50 return undef unless length($msg) >= 5;
1070 17         61 my ($code, $len) = unpack('C1N1', substr($msg, 0, 5));
1071 17         69 return $len;
1072             }
1073              
1074             =head2 simple_query
1075              
1076             Send a simple query to the server - only supports plain queries (no bind parameters).
1077              
1078             =cut
1079              
1080             sub simple_query {
1081 4     4 1 646 my $self = shift;
1082 4         10 my $sql = shift;
1083 4 50       15 die "Invalid backend state" if $self->backend_state eq 'error';
1084              
1085 4         21 $self->debug("Running query [$sql]");
1086 4         21 $self->queue(
1087             message => $self->message('Query', sql => $sql)
1088             );
1089 4         22 return $self;
1090             }
1091              
1092             =head2 copy_data
1093              
1094             Send copy data to the server.
1095              
1096             =cut
1097              
1098             sub copy_data {
1099 0     0 1 0 my $self = shift;
1100 0         0 my $data = shift;
1101 0 0       0 die "Invalid backend state" if $self->backend_state eq 'error';
1102              
1103 0         0 $self->send_message('CopyData', data => $data);
1104 0         0 return $self;
1105             }
1106              
1107             =head2 copy_done
1108              
1109             Indicate that the COPY data from the client is complete.
1110              
1111             =cut
1112              
1113             sub copy_done {
1114 0     0 1 0 my $self = shift;
1115 0         0 my $data = shift;
1116 0 0       0 die "Invalid backend state" if $self->backend_state eq 'error';
1117              
1118 0         0 $self->send_message('CopyDone');
1119 0         0 return $self;
1120             }
1121              
1122             =head2 backend_state
1123              
1124             Accessor for current backend state.
1125              
1126             =cut
1127              
1128             sub backend_state {
1129 10     10 1 83 my $self = shift;
1130 10 100       37 if(@_) {
1131 5         12 my $state = shift;
1132 5 50       15 die "bad state code" unless grep { $state eq $_ } qw(idle transaction error);
  15         48  
1133              
1134 5         10 $self->{backend_state} = $state;
1135 5         12 return $self;
1136             }
1137 5         28 return $self->{backend_state};
1138             }
1139              
1140             =head2 active_statement
1141              
1142             Returns the currently active L if we have one.
1143              
1144             =cut
1145              
1146             sub active_statement {
1147 2     2 1 5 my $self = shift;
1148 2 50       7 if(@_) {
1149 0         0 $self->{active_statement} = shift;
1150 0         0 return $self;
1151             }
1152 2         6 return $self->{active_statement};
1153             }
1154              
1155             =head2 row_description
1156              
1157             Accessor for row description.
1158              
1159             =cut
1160              
1161             sub row_description {
1162 4     4 1 7 my $self = shift;
1163 4 100       12 if(@_) {
1164 2         4 $self->{row_description} = shift;
1165 2         15 return $self;
1166             }
1167 2         8 return $self->{row_description};
1168             }
1169              
1170             =head2 prepare
1171              
1172             Prepare a L. Intended to be mostly compatible with the L
1173             ->prepare method.
1174              
1175             =cut
1176              
1177             sub prepare {
1178 1     1 1 3 my $self = shift;
1179 1         2 my $sql = shift;
1180 1         7 return $self->prepare_async(sql => $sql);
1181             }
1182              
1183             =head2 prepare_async
1184              
1185             Set up a L allowing callbacks and other options to be provided.
1186              
1187             =cut
1188              
1189             sub prepare_async {
1190 1     1 1 3 my $self = shift;
1191 1         3 my %args = @_;
1192 1 50       5 die "SQL statement not provided" unless defined $args{sql};
1193              
1194 1         14 my $sth = Protocol::PostgreSQL::Statement->new(
1195             dbh => $self,
1196             %args,
1197             );
1198 1         7 return $sth;
1199             }
1200              
1201             =head2 is_ready
1202              
1203             Returns true if we're ready to send more data to the server.
1204              
1205             =cut
1206              
1207             sub is_ready {
1208 10     10 1 73 my $self = shift;
1209 10 100       34 if(@_) {
1210 5         15 $self->{is_ready} = shift;
1211 5         9 return $self;
1212             }
1213 5 50       20 return 0 if $self->{wait_for_startup};
1214 5         35 return $self->{is_ready};
1215             }
1216              
1217             =head2 send_copy_data
1218              
1219             Send COPY data to the server. Takes an arrayref and replaces any reserved characters with quoted versions.
1220              
1221             =cut
1222              
1223             sub send_copy_data {
1224 100     100 1 273 my $self = shift;
1225 100         80 my $data = shift;
1226             my $content = pack 'a*', (join("\t", map {
1227 100 0       189 if(defined) {
  0         0  
1228 0 0       0 s/\\/\\\\/g if index($_, "\\") >= 0;
1229 0 0       0 s/\x08/\\b/g if index($_, "\x08") >= 0;
1230 0 0       0 s/\f/\\f/g if index($_, "\f") >= 0;
1231 0 0       0 s/\n/\\n/g if index($_, "\n") >= 0;
1232 0 0       0 s/\t/\\t/g if index($_, "\t") >= 0;
1233 0 0       0 s/\v/\\v/g if index($_, "\r") >= 0;
1234             } else {
1235 0         0 $_ = '\N';
1236             }
1237 0         0 $_;
1238             } @$data) . "\n");
1239            
1240 100         321 $self->invoke_event('send_request', $MESSAGE_TYPE_FRONTEND{'CopyData'} . pack('N1', 4 + length $content) . $content);
1241 100         3038 return $self;
1242             }
1243              
1244             =head2 _build_message
1245              
1246             Construct a new message.
1247              
1248             =cut
1249              
1250             sub _build_message {
1251 13     13   130 my $self = shift;
1252 13         47 my %args = @_;
1253              
1254             # Can be undef
1255 13 100       72 die "No type provided" unless exists $args{type};
1256 12 100       56 die "No data provided" unless exists $args{data};
1257              
1258             # Length includes the 4-byte length field, but not the type byte
1259 11         29 my $length = length($args{data}) + 4;
1260 11 100       54 my $msg = ($self->is_first_message ? '' : $MESSAGE_TYPE_FRONTEND{$args{type}}) . pack('N1', $length) . $args{data};
1261 11         46 return $msg;
1262             }
1263              
1264             1;
1265              
1266             __END__