File Coverage

blib/lib/Protocol/PostgreSQL/Statement.pm
Criterion Covered Total %
statement 44 102 43.1
branch 8 40 20.0
condition 0 6 0.0
subroutine 13 27 48.1
pod 12 15 80.0
total 77 190 40.5


line stmt bran cond sub pod time code
1             package Protocol::PostgreSQL::Statement;
2             BEGIN {
3 5     5   3586 $Protocol::PostgreSQL::Statement::VERSION = '0.008';
4             }
5 5     5   31 use strict;
  5         9  
  5         474  
6 5     5   26 use warnings;
  5         9  
  5         170  
7 5     5   906 use parent qw(Mixin::Event::Dispatch);
  5         428  
  5         39  
8 5     5   20092 use Scalar::Util;
  5         10  
  5         255  
9 5     5   7441 use Data::Dumper;
  5         80501  
  5         7072  
10              
11             =head1 NAME
12              
13             Protocol::PostgreSQL::Statement - prepared statement handling
14              
15             =head1 VERSION
16              
17             version 0.008
18              
19             =head1 SYNOPSIS
20              
21             use Protocol::PostgreSQL;
22             my %cache;
23             # Helper method to apply the returned values
24             my $set_cache = sub {
25             my ($sth, $row) = @_;
26             my ($k, $v) = map { $row->[$_]{data} } 0..1;
27             warn "Set $k to $v\n";
28             $cache{$k} = $v;
29             };
30             # Prepared statement to insert a new value, called when no existing value was found
31             my $add_sth = Protocol::PostgreSQL::Statement->new(
32             dbh => $dbh,
33             sql => 'insert into sometable (name) values $1 returning id, name',
34             on_data_row => $set_cache,
35             on_no_data => sub {
36             die "Had no response when trying to add value";
37             }
38             );
39             # Find existing value from table
40             my $find_sth = Protocol::PostgreSQL::Statement->new(
41             dbh => $dbh,
42             sql => 'select id, name from sometable where id = ?',
43             on_data_row => $set_cache,
44             on_no_data => sub {
45             my ($sth) = shift;
46             warn "No data found, inserting\n";
47             $add_sth->execute($sth->current_bind_values);
48             }
49             );
50             $find_sth->execute(471, "some data");
51             print "Value for 471 was " . $cache{471};
52              
53             =head1 DESCRIPTION
54              
55             Provides prepared-statement support for L.
56              
57             Sequence of events for a prepared statement:
58              
59             =over 4
60              
61             =item * Parse - check the supplied SQL, generate a prepared statement
62              
63             =item * Bind - binds values to a statement to generate a portal ('' is the empty portal)
64              
65             =item * Execute - execute a given portal
66              
67             =item * Sync - inform the server we're done and that we want to go back to L state.
68              
69             =back
70              
71             Once an execute is running, we avoid sending anything else to the server until we get a ReadyForQuery response.
72              
73             On instantiation, the statement will be parsed immediately. When this is complete, we are able to bind then execute.
74             Any requests to bind or execute before the statement is ready will be queued.
75              
76             =cut
77              
78             =head1 METHODS
79              
80             =cut
81              
82             =head2 new
83              
84             Instantiate a new object, takes the following named parameters:
85              
86             =over 4
87              
88             =item * dbh - L-compatible object for the parent database handle
89              
90             =item * sql - actual SQL query to run, with placeholders specified as ?
91              
92             =item * statement - name to assign to this statement
93              
94             =back
95              
96             Will send the parse request immediately.
97              
98             =cut
99              
100             sub new {
101 1     1 1 6 my $class = shift;
102 1         4 my %args = @_;
103 1 50       5 die "No DBH?" unless $args{dbh};
104 1 50       4 die "No SQL?" unless defined $args{sql};
105              
106 1 50       18 my $self = bless {
107             dbh => $args{dbh},
108             sql => $args{sql},
109             (exists $args{statement})
110             ? (statement => $args{statement})
111             : (),
112             state => 'parsing',
113             rows_seen => 0,
114             data_row => delete $args{data_row},
115             no_data => delete $args{no_data},
116             command_complete => delete $args{command_complete},
117             bind_pending => [],
118             execute_pending => [],
119             }, $class;
120 1 50       5 $self->{on_ready} = delete $args{on_ready} if exists $args{on_ready};
121              
122             # We queue an initial Parse request. When we get around to sending it, we'll push a describe over as well.
123             $self->dbh->queue(
124             callback => $self->sap(sub {
125 1     1   3 my $self = shift;
126 1         2 my ($dbh) = shift;
127 1         4 $dbh->debug('Sent Parse request, queuing describe');
128 1         5 $self->describe;
129 1 50       4 }),
130             type => 'Parse',
131             parameters => [
132             sql => $args{sql},
133             (exists $args{statement})
134             ? (statement => $args{statement})
135             : ()
136             ]
137             );
138 1         7 return $self;
139             }
140              
141             =head1 C
142              
143             Callback when parsing is complete.
144              
145             =cut
146              
147             sub parse_complete {
148 0     0 0 0 my $self = shift;
149 0         0 $self->{state} = 'describing';
150             }
151              
152             =head2 execute
153              
154             Bind variables to the current statement.
155              
156             =cut
157              
158             sub execute {
159 0     0 1 0 my $self = shift;
160 0         0 my $param = [ @_ ];
161 0 0       0 my $msg = $self->dbh->message(
162             'Bind',
163             param => $param,
164             sth => $self,
165             (exists $self->{statement})
166             ? (
167             statement => $self->{statement},
168             portal => $self->{statement},
169             )
170             : ()
171             );
172              
173 0 0       0 if($self->{state} eq 'ready') {
174 0         0 $self->{state} = 'bind';
175             $self->dbh->queue(
176             message => $msg,
177             callback => $self->sap(sub {
178 0     0   0 my $self = shift;
179 0         0 $self->{state} = 'ready';
180 0         0 $self->{current_bind_values} = $param;
181 0         0 $self->_execute;
182             })
183 0         0 );
184             } else {
185 0         0 push @{ $self->{bind_pending} }, $msg;
  0         0  
186             }
187 0         0 return $self;
188             }
189              
190             =head2 current_bind_values
191              
192             Returns the bind values from the currently-executing query, suitable for passing to L.
193              
194             =cut
195              
196             sub current_bind_values {
197 0     0 1 0 my $self = shift;
198 0 0       0 return unless $self->{current_bind_values};
199 0         0 return @{ $self->{current_bind_values} };
  0         0  
200             }
201              
202             =head2 data_row
203              
204             Callback when we have a data row.
205              
206             Maintains a running count of how many rows we've seen, and passes the data on to the C callback if defined.
207              
208             =cut
209              
210             sub data_row {
211 0     0 1 0 my $self = shift;
212 0         0 ++$self->{rows_seen};
213 0 0       0 return $self unless $self->{data_row};
214              
215 0         0 my $row = shift;
216 0 0       0 $self->{data_row}->($self, $row) if exists $self->{data_row};
217             }
218              
219             =head2 command_complete
220              
221             Callback for end of statement. We'll hit this if we completed without error and there's no more data available to read.
222              
223             Will call the C callback if we had no rows, and the C callback in either case.
224              
225             =cut
226              
227             sub command_complete {
228 0     0 1 0 my $self = shift;
229 0 0 0     0 $self->{no_data}->($self) if $self->{no_data} && !$self->{rows_seen};
230 0 0       0 $self->{command_complete}->($self) if $self->{command_complete};
231 0         0 $self->{rows_seen} = 0;
232 0         0 return $self;
233             }
234              
235             =head2 bind_complete
236              
237             Called when the bind is complete. Since our bind+execute handling is currently combined, this doesn't
238             do anything at the moment.
239              
240             =cut
241              
242             sub bind_complete {
243 0     0 1 0 my $self = shift;
244              
245             # $self->_execute;
246 0         0 return $self;
247             }
248              
249             =head2 _execute
250              
251             Execute this query.
252              
253             =cut
254              
255             sub _execute {
256 0     0   0 my $self = shift;
257 0 0 0     0 if($self->{state} eq 'ready' || $self->{state} eq 'bind') {
258 0         0 $self->dbh->row_description($self->row_description);
259 0 0       0 $self->dbh->send_message(
260             'Execute',
261             param => [ @_ ],
262             sth => $self,
263             (exists $self->{statement})
264             ? (portal => $self->{statement})
265             : ()
266             );
267 0         0 $self->dbh->send_message(
268             'Sync',
269             );
270             } else {
271 0         0 $self->{execute_pending} = 1;
272             }
273             }
274              
275             =head2 describe
276              
277             Describe this query. Causes PostgreSQL to send RowDescription response indicating what we expect to get back from the
278             server. Beats trying to parse the query for ourselves although it incurs an extra send/receive for each statement.
279              
280             =cut
281              
282             sub describe {
283 1     1 1 2 my $self = shift;
284 1         4 $self->{state} = 'describing';
285 1 50       3 $self->dbh->send_message(
286             'Describe',
287             param => [ @_ ],
288             (exists $self->{statement})
289             ? (statement => $self->{statement})
290             : (),
291             sth => $self,
292             );
293 1         4 $self->dbh->debug('describe complete, now ready');
294 1         2 $self->{state} = 'ready';
295 1         4 $self->on_ready();
296             }
297              
298             =head2 row_description
299              
300             Accessor to return or update the internal row description information.
301              
302             =cut
303              
304             sub row_description {
305 0     0 1 0 my $self = shift;
306 0 0       0 if(@_) {
307 0         0 $self->{row_description} = shift;
308 0         0 return $self;
309             }
310 0         0 return $self->{row_description};
311             }
312              
313             =head2 on_ready
314              
315             Called when we've finished parsing and describing this query.
316              
317             =cut
318              
319             sub on_ready {
320 1     1 1 2 my $self = shift;
321              
322 1 50       2 if(my $msg = shift(@{ $self->{bind_pending} })) {
  1         5  
323 0         0 $self->dbh->debug("have bind pending");
324 0         0 $self->{state} = 'binding';
325             $self->dbh->queue(
326             message => $msg,
327             callback => $self->sap(sub {
328 0     0   0 my $self = shift;
329 0         0 $self->{state} = 'ready';
330 0         0 $self->_execute;
331             })
332 0         0 );
333             } else {
334 1 50       6 $self->{on_ready}->() if exists $self->{on_ready};
335             }
336             }
337              
338             sub discard {
339 0     0 0 0 my $self = shift;
340 0         0 my %args = @_;
341              
342             # $self->add_handler_for_event(
343             # ) if exists $args{on_complete};
344              
345             $self->dbh->send_message(
346             'Close',
347             statement => defined($self->{statement}) ? $self->{statement} : '',
348             (exists $args{on_complete})
349 0     0   0 ? (on_complete => sub { $args{on_complete}->(); 0 })
  0         0  
350 0 0       0 : (),
    0          
351             sth => $self,
352             );
353 0         0 $self->dbh->send_message(
354             'Sync',
355             );
356             }
357              
358             sub on_close_complete {
359 0     0 0 0 my $self = shift;
360 0         0 $self->invoke_event('close_complete' => );
361 0         0 return $self;
362             }
363              
364             =head2 finish
365              
366             Finish the current statement.
367              
368             Should issue a Sync to trigger a ReadyForQuery response, but that's now handled elsewhere.
369              
370             =cut
371              
372             sub finish {
373 0     0 1 0 my $self = shift;
374             # $self->dbh->send_message('Sync');
375             }
376              
377             =head2 dbh
378              
379             Accessor for the database handle (L object).
380              
381             =cut
382              
383 3     3 1 108 sub dbh { shift->{dbh} }
384              
385             =head2 sap
386              
387             Generate a callback with weakened copy of $self.
388              
389             =cut
390              
391             sub sap {
392 1     1 1 2 my $self = shift;
393 1         2 my $code = shift;
394 1         6 Scalar::Util::weaken $self;
395 1     1   14 return sub { $code->($self, @_) };
  1         3  
396             }
397              
398             1;
399              
400             __END__