File Coverage

blib/lib/Lim/Util/DBI.pm
Criterion Covered Total %
statement 27 275 9.8
branch 0 120 0.0
condition 0 29 0.0
subroutine 9 29 31.0
pod 16 16 100.0
total 52 469 11.0


line stmt bran cond sub pod time code
1             package Lim::Util::DBI;
2              
3 1     1   9171 use common::sense;
  1         2  
  1         11  
4 1     1   69 use Carp;
  1         1  
  1         97  
5 1     1   8 use Scalar::Util qw(weaken);
  1         2  
  1         54  
6              
7 1     1   7 use Log::Log4perl ();
  1         3  
  1         17  
8 1     1   804126 use DBI ();
  1         37454  
  1         43  
9 1     1   12 use JSON::XS ();
  1         2  
  1         14  
10              
11 1     1   6 use AnyEvent ();
  1         1  
  1         15  
12 1     1   7 use AnyEvent::Util ();
  1         2  
  1         13  
13              
14 1     1   4 use Lim ();
  1         2  
  1         4020  
15              
16             =encoding utf8
17              
18             =head1 NAME
19              
20             Lim::Util::DBI - Create a DBH that is executed in a forked process
21              
22             =head1 VERSION
23              
24             See L for version.
25              
26             =cut
27              
28             our $VERSION = $Lim::VERSION;
29             our %METHOD = (
30             connect => 1,
31             disconnect => 1,
32             execute => 1,
33             begin_work => 1,
34             commit => 1,
35             rollback => 1
36             );
37              
38             =head1 SYNOPSIS
39              
40             =over 4
41              
42             use Lim::Util::DBI;
43              
44             =back
45              
46             =head1 METHODS
47              
48             =over 4
49              
50             =item new
51              
52             =cut
53              
54             sub new {
55 0     0 1   my $this = shift;
56 0   0       my $class = ref($this) || $this;
57 0           my $dbi = shift;
58 0           my $user = shift;
59 0           my $password = shift;
60 0           my %args = ( @_ );
61 0           my $self = {
62             logger => Log::Log4perl->get_logger,
63             json => JSON::XS->new->ascii->convert_blessed,
64             busy => 0
65             };
66 0           bless $self, $class;
67 0           my $real_self = $self;
68 0           weaken($self);
69            
70 0 0         unless (defined $dbi) {
71 0           confess __PACKAGE__, ': Missing dbi connection string';
72             }
73 0 0 0       unless (defined $args{on_connect} and ref($args{on_connect}) eq 'CODE') {
74 0           confess __PACKAGE__, ': Missing on_connect or it is not CODE';
75             }
76            
77 0           my $on_connect = delete $args{on_connect};
78              
79 0 0         if (defined $args{on_error}) {
80 0 0         unless (ref($args{on_error}) eq 'CODE') {
81 0           confess __PACKAGE__, ': on_error is not CODE';
82             }
83 0           $self->{on_error} = delete $args{on_error};
84             }
85              
86 0           my ($child, $parent) = AnyEvent::Util::portable_socketpair;
87 0 0 0       unless (defined $child and defined $parent) {
88 0           confess __PACKAGE__, ': Unable to create client/server socket pairs: ', $!;
89             }
90              
91 0           AnyEvent::Util::fh_nonblocking $child, 1;
92 0           $self->{child} = $child;
93              
94 0           my $pid = fork;
95            
96 0 0         if ($pid) {
    0          
97             #
98             # Parent process
99             #
100            
101 0           close $parent;
102              
103 0           $self->{child_pid} = $pid;
104             $self->{child_watcher} = AnyEvent->io(
105             fh => $child,
106             poll => 'r',
107             cb => sub {
108 0 0 0 0     unless (defined $self and exists $self->{child}) {
109 0           return;
110             }
111            
112 0           my $response;
113 0           my $len = sysread $self->{child}, my $buf, 64*1024;
114 0 0         if ($len > 0) {
    0          
    0          
115 0           undef $@;
116            
117 0           eval {
118 0           $response = $self->{json}->incr_parse($buf);
119             };
120 0 0         if ($@) {
121 0 0         Lim::DEBUG and $self->{logger}->debug('Response JSON parse failed: ', $@);
122 0           $response = [];
123             }
124             else {
125 0           my $errstr = shift @$response;
126 0 0         if ($errstr) {
127 0           $@ = $errstr;
128             }
129             }
130             }
131             elsif (defined $len) {
132 0           $@ = 'Unexpected EOF';
133 0 0         Lim::DEBUG and $self->{logger}->debug($@);
134            
135 0           shutdown($self->{child}, 2);
136 0           close(delete $self->{child});
137 0           $response = [];
138             }
139             elsif ($! != Errno::EAGAIN) {
140 0           $@ = 'Unable to read from child: '.$!;
141 0 0         Lim::DEBUG and $self->{logger}->debug($@);
142              
143 0           shutdown($self->{child}, 2);
144 0           close(delete $self->{child});
145 0           $response = [];
146             }
147            
148 0 0 0       if (defined $response and exists $self->{cb}) {
149 0 0         unless (ref($response) eq 'ARRAY') {
150 0           $@ = 'Invalid response';
151 0 0         Lim::DEBUG and $self->{logger}->debug($@);
152 0           $response = [];
153             }
154            
155 0           my $cb = delete $self->{cb};
156 0           $self->{busy} = 0;
157 0           $cb->($self, @$response);
158             }
159 0           });
160             }
161             elsif (defined $pid) {
162             #
163             # Child process
164             #
165              
166 0           $SIG{HUP} => 'IGNORE';
167 0           $SIG{INT} => 'IGNORE';
168 0           $SIG{TERM} => 'IGNORE';
169 0           $SIG{PIPE} => 'IGNORE';
170 0           $SIG{QUIT} => 'IGNORE';
171 0           $SIG{ALRM} => 'IGNORE';
172              
173 0           Log::Log4perl->init( \q(
174             log4perl.threshold = OFF
175             log4perl.logger = DEBUG, Screen
176             log4perl.appender.Screen = Log::Log4perl::Appender::Screen
177             log4perl.appender.Screen.stderr = 0
178             log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout
179             log4perl.appender.Screen.layout.ConversionPattern = %d %F [%L] %p: %m%n
180             ) );
181              
182 0 0 0       if (exists $self->{close_fds} and $self->{close_fds}) {
183 0           my $parent_fno = fileno $parent;
184            
185 0   0       foreach ($^F+1 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) {
186 0 0         unless ($_ == $parent_fno) {
187 0           POSIX::close($_);
188             }
189             }
190             }
191            
192 0           while () {
193 0           my $request;
194 0           my $len = sysread $parent, my $buf, 64*1024;
195 0 0         if ($len > 0) {
    0          
196 0           undef $@;
197            
198 0           eval {
199 0           $request = $self->{json}->incr_parse($buf);
200             };
201 0 0         if ($@) {
202 0           last;
203             }
204             }
205             elsif (defined $len) {
206 0           last;
207             }
208             else {
209 0           last;
210             }
211            
212 0 0         if (defined $request) {
213 0 0         unless (ref($request) eq 'ARRAY') {
214 0           last;
215             }
216            
217 0           my $response = $self->process(@$request);
218            
219 0           undef $@;
220 0           eval {
221 0           $response = $self->{json}->encode($response);
222             };
223 0 0         if ($@) {
224 0           my $errstr = $@;
225 0           undef $@;
226 0           eval {
227 0           $response = $self->{json}->encode([$errstr]);
228             };
229 0 0         if ($@) {
230 0           last;
231             }
232             }
233            
234 0           my $wrote = 0;
235 0           my $res_len = length $response;
236 0           while () {
237 0           $len = syswrite $parent, $response, 64*1024;
238 0 0 0       unless (defined $len and $len > 0) {
239 0           last;
240             }
241            
242 0           $wrote += $len;
243            
244 0 0         if ($wrote >= $res_len) {
245 0           last;
246             }
247            
248 0           $response = substr $response, $len;
249             }
250 0 0         if ($wrote != $res_len) {
251 0           last;
252             }
253             }
254             }
255 0           shutdown($parent, 2);
256 0           close($parent);
257 0           exit(0);
258             }
259             else {
260 0           confess __PACKAGE__, ': Unable to fork: ', $!;
261             }
262            
263 0           $self->connect($on_connect, $dbi, $user, $password, %args);
264              
265 0 0         Lim::OBJ_DEBUG and $self->{logger}->debug('new ', __PACKAGE__, ' ', $self);
266 0           $self;
267             }
268              
269             sub DESTROY {
270 0     0     my ($self) = @_;
271 0 0         Lim::OBJ_DEBUG and $self->{logger}->debug('destroy ', __PACKAGE__, ' ', $self);
272              
273 0 0         if (exists $self->{child_pid}) {
274 0           my $child_watcher; $child_watcher = AnyEvent->child(
275             pid => $self->{child_pid},
276             cb => sub {
277 0     0     undef $child_watcher;
278 0           });
279             }
280            
281 0 0         if (exists $self->{child}) {
282 0           shutdown($self->{child}, 2);
283 0           close($self->{child});
284             }
285             }
286              
287             =item process
288              
289             =cut
290              
291             sub process {
292 0     0 1   my $self = shift;
293 0           my $method = shift;
294 0           my $response;
295            
296 0 0         unless (exists $METHOD{$method}) {
297 0           return ['Method '.$method.' is not allowed'];
298             }
299            
300 0           $method = 'child_'.$method;
301 0           eval {
302 0           $response = $self->$method(@_);
303             };
304 0 0         if ($@) {
305 0           return [$@];
306             }
307            
308 0           return $response;
309             }
310              
311             =item request
312              
313             =cut
314              
315             sub request {
316 0     0 1   my $self = shift;
317 0           my $cb = shift;
318 0           my ($method) = @_;
319 0           my $request;
320 0           weaken($self);
321            
322 0           undef $@;
323            
324 0 0         unless (ref($cb) eq 'CODE') {
325 0           confess __PACKAGE__, 'cb is not CODE';
326             }
327              
328 0 0         unless (exists $METHOD{$method}) {
329 0           $@ = 'Method '.$method.' is not allowed';
330 0           $cb->();
331 0           return;
332             }
333            
334 0 0         unless (exists $self->{child}) {
335 0           $@ = 'No connection to the DBI process';
336 0           $cb->();
337 0           return;
338             }
339            
340 0 0         if ($self->{busy}) {
341 0           $@ = 'DBH is busy, multiple command execution is not allowed';
342 0           $cb->();
343 0           return;
344             }
345            
346 0           eval {
347 0           $request = $self->{json}->encode(\@_);
348             };
349 0 0         if ($@) {
350 0           $cb->();
351 0           return;
352             }
353            
354 0           $self->{busy} = 1;
355 0           $self->{cb} = $cb;
356            
357 0 0         Lim::DEBUG and $self->{logger}->debug('Sending DBI request ', $method);
358              
359 0           my $len = syswrite $self->{child}, $request;
360 0 0 0       unless (defined $len and $len > 0) {
361 0           $@ = 'Connection broken';
362 0           $self->kill;
363 0           $cb->();
364 0           return;
365             }
366            
367 0 0         unless ($len >= length $request) {
368 0           $request = substr $request, $len;
369            
370             $self->{request_watcher} = AnyEvent->io(
371             fh => $request,
372             poll => 'w',
373             cb => sub {
374 0 0   0     unless (defined $self) {
375 0           return;
376             }
377            
378 0           $len = syswrite $self->{child}, $request;
379 0 0 0       unless (defined $len and $len > 0) {
380 0           $@ = 'Connection broken';
381 0           my $cb = $self->{cb};
382 0           $self->kill;
383 0           $cb->();
384 0           return;
385             }
386            
387 0 0         unless ($len >= length $request) {
388 0           $request = substr $request, $len;
389 0           return;
390             }
391            
392 0           delete $self->{request_watcher};
393 0           });
394             }
395            
396 0           return 1;
397             }
398              
399             =item kill
400              
401             =cut
402              
403             sub kill {
404 0     0 1   my ($self) = @_;
405            
406 0 0         if (exists $self->{child}) {
407 0           shutdown($self->{child}, 2);
408 0           close(delete $self->{child});
409             }
410            
411 0           delete $self->{child_watcher};
412 0           delete $self->{request_watcher};
413 0           $self->{busy} = 0;
414 0           delete $self->{cb};
415             }
416              
417             =item child_connect
418              
419             =cut
420              
421             sub child_connect {
422 0     0 1   my ($self, $dbi, $user, $pass, $attr) = @_;
423            
424 0 0         unless (($self->{dbh} = DBI->connect($dbi, $user, $pass, $attr))) {
425 0           return [$DBI::errstr];
426             }
427              
428 0           [0, 1];
429             }
430              
431             =item child_disconnect
432              
433             =cut
434              
435             sub child_disconnect {
436 0     0 1   my ($self) = @_;
437            
438 0 0         unless (defined $self->{dbh}) {
439 0           return ['No connect to the database exists'];
440             }
441            
442 0           $self->{dbh}->disconnect;
443 0           delete $self->{dbh};
444            
445 0           [0, 1];
446             }
447              
448             =item child_execute
449              
450             =cut
451              
452             sub child_execute {
453 0     0 1   my ($self, $statement, @args) = @_;
454 0           my ($sth, $rv, $rows);
455            
456 0 0         unless (defined $self->{dbh}) {
457 0           return ['No connect to the database exists'];
458             }
459            
460 0 0         unless (($sth = $self->{dbh}->prepare_cached($statement, undef, 1))) {
461 0           return [$DBI::errstr];
462             }
463            
464 0 0         unless (($rv = $sth->execute(@args))) {
465 0           return [$sth->errstr];
466             }
467            
468 0           $rows = $sth->fetchall_arrayref;
469 0           $sth->finish;
470            
471 0           [0, $rows, $rv];
472             }
473              
474             =item child_begin_work
475              
476             =cut
477              
478             sub child_begin_work {
479 0     0 1   my ($self) = @_;
480            
481 0 0         unless (defined $self->{dbh}) {
482 0           return ['No connect to the database exists'];
483             }
484              
485 0 0         unless ($self->{dbh}->begin_work) {
486 0           return [$DBI::errstr];
487             }
488              
489 0           [0, 1];
490             }
491              
492             =item child_commit
493              
494             =cut
495              
496             sub child_commit {
497 0     0 1   my ($self) = @_;
498            
499 0 0         unless (defined $self->{dbh}) {
500 0           return ['No connect to the database exists'];
501             }
502              
503 0 0         unless ($self->{dbh}->commit) {
504 0           return [$DBI::errstr];
505             }
506              
507 0           [0, 1];
508             }
509              
510             =item child_rollback
511              
512             =cut
513              
514             sub child_rollback {
515 0     0 1   my ($self) = @_;
516            
517 0 0         unless (defined $self->{dbh}) {
518 0           return ['No connect to the database exists'];
519             }
520              
521 0 0         unless ($self->{dbh}->rollback) {
522 0           return [$DBI::errstr];
523             }
524              
525 0           [0, 1];
526             }
527              
528             =item connect
529              
530             =cut
531              
532             sub connect {
533 0     0 1   my ($self, $cb, $dbi, $user, $pass, %attr) = @_;
534            
535 0           $self->request($cb, 'connect', $dbi, $user, $pass, \%attr);
536             }
537              
538             =item disconnect
539              
540             =cut
541              
542             sub disconnect {
543 0     0 1   my ($self, $cb) = @_;
544            
545 0           $self->request($cb, 'disconnect');
546             }
547              
548             =item execute
549              
550             =cut
551              
552             sub execute {
553 0     0 1   my $cb = pop(@_);
554 0           my ($self, $statement, @args) = @_;
555            
556 0           $self->request($cb, 'execute', $statement, @args);
557             }
558              
559             =item begin_work
560              
561             =cut
562              
563             sub begin_work {
564 0     0 1   my ($self, $cb) = @_;
565            
566 0           $self->request($cb, 'begin_work');
567             }
568              
569             =item commit
570              
571             =cut
572              
573             sub commit {
574 0     0 1   my ($self, $cb) = @_;
575            
576 0           $self->request($cb, 'commit');
577             }
578              
579             =item rollback
580              
581             =cut
582              
583             sub rollback {
584 0     0 1   my ($self, $cb) = @_;
585            
586 0           $self->request($cb, 'rollback');
587             }
588              
589             =back
590              
591             =head1 AUTHOR
592              
593             Jerry Lundström, C<< >>
594              
595             =head1 BUGS
596              
597             Please report any bugs or feature requests to L.
598              
599             =head1 SUPPORT
600              
601             You can find documentation for this module with the perldoc command.
602              
603             perldoc Lim::Util::DBI
604              
605             You can also look for information at:
606              
607             =over 4
608              
609             =item * Lim issue tracker (report bugs here)
610              
611             L
612              
613             =back
614              
615             =head1 ACKNOWLEDGEMENTS
616              
617             =head1 LICENSE AND COPYRIGHT
618              
619             Copyright 2012-2013 Jerry Lundström.
620              
621             This program is free software; you can redistribute it and/or modify it
622             under the terms of either: the GNU General Public License as published
623             by the Free Software Foundation; or the Artistic License.
624              
625             See http://dev.perl.org/licenses/ for more information.
626              
627              
628             =cut
629              
630             1; # End of Lim::Util::DBI