File Coverage

blib/lib/POE/Component/EasyDBI.pm
Criterion Covered Total %
statement 231 390 59.2
branch 88 206 42.7
condition 20 78 25.6
subroutine 30 35 85.7
pod 1 19 5.2
total 370 728 50.8


line stmt bran cond sub pod time code
1             package POE::Component::EasyDBI;
2              
3 2     2   468203 use strict;
  2         22  
  2         67  
4 2     2   13 use warnings FATAL =>'all';
  2         7  
  2         128  
5              
6             # Initialize our version
7             our $VERSION = '1.29';
8              
9             # Import what we need from the POE namespace
10 2     2   11 use POE;
  2         4  
  2         12  
11 2     2   693 use POE::Session;
  2         9  
  2         8  
12 2     2   1135 use POE::Filter::Reference;
  2         15138  
  2         59  
13 2     2   936 use POE::Filter::Line;
  2         3271  
  2         69  
14 2     2   1342 use POE::Wheel::Run;
  2         35522  
  2         74  
15 2     2   1376 use POE::Component::EasyDBI::SubProcess;
  2         8  
  2         98  
16              
17 2     2   1115 use Params::Util qw( _ARRAY _HASH );
  2         13206  
  2         182  
18 2     2   15 use Scalar::Util qw( reftype );
  2         4  
  2         81  
19              
20              
21             # Miscellaneous modules
22 2     2   45 use Carp;
  2         5  
  2         157  
23 2     2   13 use vars qw($AUTOLOAD);
  2         5  
  2         9795  
24              
25             our %COMMANDS = map { $_ => 1 } qw(
26             commit
27             rollback
28             begin_work
29             func
30             method
31             insert
32             do
33             single
34             quote
35             arrayhash
36             hashhash
37             hasharray
38             array
39             arrayarray
40             hash
41             keyvalhash
42             shutdown
43             combo
44             );
45              
46             sub AUTOLOAD {
47 13     13   7698 my $self = shift;
48 13         22 my $method = $AUTOLOAD;
49 13         102 $method =~ s/.*:://;
50 13 50       53 my $call = ( $method =~ s/^_// ) ? 1 : 0;
51              
52 13 50       73 return unless $method =~ /[^A-Z]/;
53              
54 13         35 $method = lc( $method );
55              
56             croak "EasyDBI command $method does not exist"
57 13 50       38 unless ( exists( $COMMANDS{ $method } ) );
58              
59 13 100       175 my @args = ( $method eq 'shutdown' ) ? @_ : { @_ };
60 13 50       33 if ( $call ) {
61 0         0 $poe_kernel->call($self->{ID} => $method => @args);
62             } else {
63 13         55 $poe_kernel->post($self->{ID} => $method => @args);
64             }
65             }
66              
67             # TODO use constants?
68             sub MAX_RETRIES () { 5 }
69             sub DEBUG () { 0 }
70              
71             # Autoflush on STDOUT
72             # Select returns last selected handle
73             # So, reselect it after selecting STDOUT and setting Autoflush
74             #select((select(STDOUT), $| = 1)[0]);
75              
76             sub spawn {
77 1     1 0 1700 my ($self,$session) = &new;
78 1         13 return $session;
79             }
80              
81             sub create {
82 0     0 0 0 &spawn;
83             }
84              
85             sub new {
86 2     2 0 1569 my $class = shift;
87              
88             # The options hash
89 2         5 my %opt;
90              
91             # Support passing in a hash ref or a regular hash
92 2 50 33     16 if ((@_ & 1) && _HASH($_[0])) {
93 0         0 %opt = %{$_[0]};
  0         0  
94             } else {
95             # Sanity checking
96 2 50       7 if (@_ & 1) {
97 0         0 croak('POE::Component::EasyDBI requires an even number of options '
98             .'passed to new()/spawn() call');
99             }
100 2         31 %opt = @_;
101             }
102              
103             # lowercase keys
104 2         10 %opt = map { lc($_) => $opt{$_} } keys %opt;
  16         49  
105              
106 2         27 my @valid = qw(
107             dsn
108             username
109             password
110             options
111             alias
112             max_retries
113             ping_timeout
114             use_cancel
115             no_connect_failures
116             connect_error
117             reconnect_wait
118             connected
119             no_warnings
120             sig_ignore_off
121             no_cache
122             alt_fork
123             stopwatch
124             query_logger
125             );
126              
127             # check the DSN
128             # username/password/port other options
129             # should be part of the DSN
130 2 50       9 if (!exists($opt{dsn})) {
131 0         0 croak('DSN is required to spawn a new POE::Component::EasyDBI '
132             .'instance!');
133             }
134              
135             # check the USERNAME
136 2 50       9 if (!exists($opt{username})) {
137 0         0 croak('username is required to spawn a new POE::Component::EasyDBI '
138             .'instance!');
139             }
140              
141             # check the PASSWORD
142 2 50       8 if (!exists($opt{password})) {
143 0         0 croak('password is required to spawn a new POE::Component::EasyDBI '
144             .'instance!');
145             }
146              
147             # check the reconnect wait time
148 2 50 33     19 if (exists($opt{reconnect_wait}) && $opt{reconnect_wait} < 1) {
    50          
149 0         0 warn('A reconnect_wait of less than 1 second is NOT recommended, '
150             .'continuing anyway');
151             } elsif (!$opt{reconnect_wait}) {
152 2         7 $opt{reconnect_wait} = 2;
153             }
154              
155             # check the session alias
156 2 50       9 if (!exists($opt{alias})) {
157             # Debugging info...
158 0         0 DEBUG && warn 'Using default Alias EasyDBI';
159              
160             # Set the default
161 0         0 $opt{alias} = 'EasyDBI';
162             }
163              
164             # check for connect error event
165 2 50       11 if (exists($opt{connect_error})) {
166 2 50       23 if (_ARRAY($opt{connect_error})) {
167 2 50       6 unless ($#{$opt{connect_error}} > 0) {
  2         11  
168 0         0 warn('connect_error must be an array reference that contains '
169             .'at least a session and event, ignoring');
170 0         0 delete $opt{connect_error};
171             }
172             } else {
173 0         0 warn('connect_error must be an array reference that contains at '
174             .'least a session and event, ignoring');
175 0         0 delete $opt{connect_error};
176             }
177             }
178              
179             # check for connect event
180 2 50       7 if (exists($opt{connected})) {
181 2 50       10 if (_ARRAY($opt{connected})) {
182 2 50       4 unless ($#{$opt{connected}} > 0) {
  2         10  
183 0         0 warn('connected must be an array reference that contains '
184             .'at least a session and event, ignoring');
185 0         0 delete $opt{connected};
186             }
187             } else {
188 0         0 warn('connected must be an array reference that contains at '
189             .'least a session and event, ignoring');
190 0         0 delete $opt{connected};
191             }
192             }
193              
194 2 50       9 if (exists($opt{options})) {
195 0 0       0 unless (_HASH($opt{options})) {
196 0         0 warn('options must be a hash ref, ignoring');
197 0         0 delete $opt{options};
198             }
199             }
200              
201 2 50       6 if ($opt{stopwatch}) {
202 2     2   14 eval "use Time::Stopwatch";
  2         4  
  2         24  
  2         129  
203 2 50       13 if ($@) {
204 0         0 warn('cannot use stopwatch (ignored), Time::Stopwatch not installed? ');
205 0         0 delete $opt{stopwatch};
206             }
207             }
208              
209 2         8 my $keep = { map { $_ => delete $opt{$_} } @valid };
  36         89  
210              
211             # Anything left over is unrecognized
212 2 50       11 if (keys %opt) {
213 0         0 croak('Unrecognized keys/options ('.join(',',(keys %opt))
214             .') were present in new() call to POE::Component::EasyDBI!');
215             }
216              
217 2   33     17 my $self = bless($keep,ref $class || $class);
218              
219             # Create a new session for ourself
220             my $session = POE::Session->create(
221             # Our subroutines
222             'object_states' => [
223             $self => {
224             # Maintenance events
225             '_start' => 'start',
226             '_stop' => 'stop',
227             'setup_wheel' => 'setup_wheel',
228             'sig_child' => 'sig_child',
229              
230             # child events
231             'child_error' => 'child_error',
232             'child_closed' => 'child_closed',
233             'child_STDOUT' => 'child_STDOUT',
234             'child_STDERR' => 'child_STDERR',
235              
236             # database events
237 36         229 (map { $_ => 'db_handler', uc($_) => 'db_handler' } keys %COMMANDS),
238              
239             # redefine
240             'combo' => 'combo_query',
241             'COMBO' => 'combo_query',
242             'shutdown' => 'shutdown_poco',
243             'SHUTDOWN' => 'shutdown_poco',
244              
245              
246             # Queue handling
247             'send_query' => 'send_query',
248             'check_queue' => 'check_queue',
249             'print_queue' => 'print_queue',
250             },
251             ],
252              
253             # Set up the heap for ourself
254             'heap' => {
255             # The queue of DBI calls
256             'queue' => [],
257             'idcounter' => 0,
258              
259             # The Wheel::Run object placeholder
260             'wheel' => undef,
261              
262             # How many times have we re-created the wheel?
263             'retries' => 0,
264              
265             # Are we shutting down?
266             'shutdown' => 0,
267              
268             # Valid options
269             'opts' => $keep,
270              
271             # The alia/s we will run under
272             'alias' => $keep->{alias},
273              
274             # Number of times to retry connection
275             # (if connection failures option is off)
276             'max_retries' => $keep->{max_retries} || MAX_RETRIES,
277              
278             # Connection failure option
279 2 50 50     21 'no_connect_failures' => $keep->{no_connect_failures} || 0,
      50        
280              
281             # extra params for actions
282             action_params => {
283             commit => [],
284             rollback => [],
285             begin_work => [],
286             func => [qw( args )],
287             method => [qw( function args )],
288             single => [qw( separator )],
289             insert => [qw( insert hash table last_insert_id )],
290             array => [qw( chunked separator )],
291             arrayarray => [qw( chunked )],
292             keyvalhash => [qw( primary_key chunked )],
293             hashhash => [qw( primary_key chunked )],
294             hasharray => [qw( primary_key chunked )],
295             arrayhash => [qw( chunked )],
296             },
297             },
298             ) or die 'Unable to create a new session!';
299              
300             # save the session id
301 2         1070 $self->{ID} = $session->ID;
302              
303 2 100       184 return wantarray ? ($self,$session) : $self;
304             }
305              
306             # This subroutine handles shutdown signals
307             sub shutdown_poco {
308 2     2 0 769 my ($kernel, $heap) = @_[KERNEL,HEAP];
309              
310             # Check for duplicate shutdown signals
311 2 50       12 if ($heap->{shutdown}) {
312             # Okay, let's see what's going on
313 0 0 0     0 if ($heap->{shutdown} == 1 && ! defined $_[ARG0]) {
    0          
314             # Duplicate shutdown events
315 0         0 DEBUG && warn 'Duplicate shutdown event fired!';
316 0         0 return;
317             } elsif ($heap->{shutdown} == 2) {
318             # Tried to shutdown_NOW again...
319 0         0 DEBUG && warn 'Duplicate shutdown NOW fired!';
320 0         0 return;
321             }
322             }
323              
324             # Check if we got "NOW"
325 2 50 33     14 if (defined($_[ARG0]) && uc($_[ARG0]) eq 'NOW') {
326             # Actually shut down!
327 0         0 $heap->{shutdown} = 2;
328              
329 0 0       0 if ($heap->{wheel}) {
330             # KILL our subprocess
331 0         0 $heap->{wheel}->kill(9);
332             }
333              
334             # Delete the wheel, so we have nothing to keep
335             # the GC from destructing us...
336 0         0 delete $heap->{wheel};
337              
338             # Go over our queue, and do some stuff
339 0         0 foreach my $queue (@{$heap->{queue}}) {
  0         0  
340             # Skip the special EXIT actions we might have put on the queue
341 0 0 0     0 if ($queue->{action} && $queue->{action} eq 'EXIT') { next }
  0         0  
342              
343             # Post a failure event to all the queries on the Queue
344             # informing them that we have been shutdown...
345 0         0 $queue->{error} = 'POE::Component::EasyDBI was '
346             .'shut down forcibly!';
347 0         0 $kernel->post( $queue->{session}, $queue->{event}, $queue);
348              
349             # Argh, decrement the refcount
350 0         0 $kernel->refcount_decrement( $queue->{session}, 'EasyDBI' );
351             }
352              
353             # Tell the kernel to kill us!
354 0         0 $kernel->signal( $_[SESSION], 'KILL' );
355             } else {
356             # Gracefully shut down...
357 2         6 $heap->{shutdown} = 1;
358              
359             # Put into the queue EXIT for the child
360 2         16 $kernel->yield( 'send_query', {
361             action => 'EXIT',
362             sql => undef,
363             placeholders => undef,
364             }
365             );
366             }
367             }
368              
369             sub combo_query {
370 2     2 0 343 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
371              
372             # Get the arguments
373 2 50       17 unless (_HASH($args)) {
374 0         0 $args = { @_[ ARG0 .. $#_ ] };
375             }
376              
377             # Add some stuff to the args
378             # defaults to sender, but can be specified
379 2 50       30 unless (defined($args->{session})) {
380 2         9 $args->{session} = $_[SENDER]->ID();
381 2         11 DEBUG && print "setting session to $args->{session}\n";
382             }
383              
384 2         8 $args->{action} = $_[STATE];
385              
386 2 50       9 if (!exists($args->{event})) {
387             # Nothing much we can do except drop this quietly...
388 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
389             ." State: " . $_[STATE] . " Args: " . join('',%$args);
390 0         0 return;
391             } else {
392 2         6 my $a = ref($args->{event});
393 2 0 33     11 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
394 0         0 warn "Received an malformed event argument from caller"
395             ." (only postbacks, callbacks and scalar allowed) "
396             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
397             ." Args: " . %$args;
398 0         0 return;
399             }
400             }
401              
402 2         5 my @res;
403             my $handle = sub {
404 4     4   10 push(@res, shift);
405 4 100       20 $poe_kernel->post( $args->{session} => $args->{event} => @res ) if (delete $res[ -1 ]->{__last});
406 2         53 };
407              
408 2         8 foreach my $i ( 0 .. $#{ $args->{queries} } ) {
  2         11  
409 4         16 my ($type, $arg) = %{ $args->{queries}->[ $i ] };
  4         16  
410              
411             # Copy pass-through options
412 4         8 for my $key ( keys %{ $args } ) {
  4         16  
413 16 100 66     62 next if defined $arg->{$key} || $key eq 'queries';
414 12         30 $arg->{$key} = $args->{$key};
415             }
416              
417 4         9 $arg->{event} = $handle;
418 4         17 $arg->{__last} = ( $i == $#{ $args->{queries} } );
  4         23  
419 4         19 $kernel->call( $_[SESSION] => $type => $arg );
420             }
421              
422 2         17 return;
423             }
424              
425             # This subroutine handles queries
426             sub db_handler {
427 27     27 0 8280 my ($kernel, $heap) = @_[KERNEL,HEAP];
428              
429             # Get the arguments
430 27         56 my $args;
431 27 50       134 if (_HASH($_[ARG0])) {
432 27         80 $args = $_[ARG0];
433             } else {
434 0         0 warn "first parameter must be a hash ref, trying to adjust. "
435             ."(fix this to get rid of this message)";
436 0         0 $args = { @_[ARG0 .. $#_ ] };
437             }
438              
439             # fix a stupid spelling mistake
440 27 50       121 if ($args->{seperator}) {
441 0         0 $args->{separator} = $args->{seperator};
442             }
443              
444             # Add some stuff to the args
445             # defaults to sender, but can be specified
446 27 100       75 unless (defined($args->{session})) {
447 23         70 $args->{session} = $_[SENDER]->ID();
448 23         154 DEBUG && print "setting session to $args->{session}\n";
449             }
450              
451 27         109 $args->{action} = $_[STATE];
452              
453 27 50       70 if (!exists($args->{event})) {
454             # Nothing much we can do except drop this quietly...
455 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
456             ." State: " . $_[STATE] . " Args: " . join(',',%$args);
457 0         0 return;
458             } else {
459 27         133 my $a = ref($args->{event});
460 27 0 33     107 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
461 0         0 warn "Received an malformed event argument from caller"
462             ." (only postbacks, callbacks and scalar allowed) "
463             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
464             ." Args: " . %$args;
465 0         0 return;
466             }
467             }
468              
469 27 100       77 if (!defined($args->{sql})) {
470 6 50       59 unless ($args->{action} =~ m/^(insert|func|method|commit|rollback|begin_work)$/i) {
471 0         0 $args->{error} = 'sql is not defined!';
472             # Okay, send the error to the Failure Event
473 0         0 $kernel->post($args->{session}, $args->{event}, $args);
474 0         0 return;
475             }
476             } else {
477 21 50       58 if (ref($args->{sql})) {
478 0         0 $args->{error} = 'sql is not a scalar!';
479 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
480 0         0 my $callback = delete $args->{event};
481 0         0 $callback->($args);
482             } else {
483             # send the error to the Failure Event
484 0         0 $kernel->post($args->{session}, $args->{event}, $args);
485             }
486 0         0 return;
487             }
488             }
489              
490             # Check for placeholders
491 27 100       67 if (!defined($args->{placeholders})) {
492             # Create our own empty placeholders
493 23         91 $args->{placeholders} = [];
494             } else {
495 4 50       20 unless (_ARRAY($args->{placeholders})) {
496 0         0 $args->{error} = 'placeholders is not an array!';
497 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
498 0         0 my $callback = delete $args->{event};
499 0         0 $callback->($args);
500             } else {
501             # Okay, send the error to the Failure Event
502 0         0 $kernel->post($args->{session}, $args->{event}, $args);
503             }
504 0         0 return;
505             }
506             }
507              
508             # Check for primary_key on HASHHASH or ARRAYHASH queries
509 27 50 33     199 if ($args->{action} eq 'HASHHASH' || $args->{action} eq 'HASHARRAY') {
510 0 0       0 if (!defined($args->{primary_key})) {
511 0         0 $args->{error} = 'primary_key is not defined! It must '
512             .'be a column name or a 1 based index of a column';
513 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
514 0         0 my $callback = delete $args->{event};
515 0         0 $callback->($args);
516             } else {
517 0         0 $kernel->post($args->{session}, $args->{event}, $args);
518             }
519 0         0 return;
520             } else {
521 0         0 $args->{error} = 'primary_key is not a scalar!';
522 0 0       0 if (ref($args->{sql})) {
523             # send the error to the Failure Event
524 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
525 0         0 my $callback = delete $args->{event};
526 0         0 $callback->($args);
527             } else {
528 0         0 $kernel->post($args->{session}, $args->{event}, $args);
529             }
530 0         0 return;
531             }
532             }
533             }
534              
535             # Check if we have shutdown or not
536 27 50       87 if ($heap->{shutdown}) {
537 0         0 $args->{error} = 'POE::Component::EasyDBI is shutting'
538             .' down now, requests are not accepted!';
539             # Do not accept this query
540 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
541 0         0 my $callback = delete $args->{event};
542 0         0 $callback->($args);
543             } else {
544 0         0 $kernel->post($args->{session}, $args->{event}, $args);
545             }
546 0         0 return;
547             }
548              
549             # Increment the refcount for the session that is sending us this query
550 27         252 $kernel->refcount_increment($args->{session}, 'EasyDBI');
551              
552 27 100       1018 if ($args->{session} ne $_[SENDER]->ID()) {
553 4         23 $kernel->refcount_increment($_[SENDER]->ID(), 'EasyDBI');
554 4         118 $args->{sendersession} = $_[SENDER]->ID();
555             }
556              
557             # Okay, fire off this query!
558 27         336 $kernel->call($_[SESSION] => 'send_query' => $args);
559              
560 27         181 return;
561             }
562              
563             # This subroutine starts the process of sending a query
564             sub send_query {
565 29     29 0 1657 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
566              
567             # Validate that we have something
568 29 50 33     217 if (!defined($args) || !_HASH($args) ) {
569 0         0 return;
570             }
571              
572             # Add the ID to the query
573 29         77 $args->{id} = $heap->{idcounter}++;
574              
575             # Add this query to the queue
576 29         59 push(@{ $heap->{queue} }, $args);
  29         87  
577              
578             # Send the query!
579 29         112 $kernel->call($_[SESSION], 'check_queue');
580              
581 29         226 return;
582             }
583              
584             # This subroutine is the meat - sends queries to the subprocess
585             sub check_queue {
586 58     58 0 2628 my ($kernel, $heap) = @_[KERNEL,HEAP];
587              
588             # Check if the subprocess is currently active
589 58 100       174 return unless (!$heap->{active});
590              
591             # Check if we have a query in the queue
592 56 100       105 return unless (scalar(@{ $heap->{queue} }) > 0);
  56         212  
593              
594             # shutting down?
595 29 50       103 return unless ($heap->{shutdown} != 2);
596              
597 29 50       87 if ($heap->{opts}{stopwatch}) {
598 29         190 tie $heap->{queue}->[0]->{stopwatch}, 'Time::Stopwatch';
599             }
600              
601             # Copy what we need from the top of the queue
602 29         366 my %queue;
603 29         64 foreach (
604             qw(id sql action placeholders no_cache begin_work commit method)
605 29         111 ,@{$heap->{action_params}->{$heap->{queue}->[0]->{action}}}
606             ) {
607 266 100       582 next unless (defined($heap->{queue}->[0]->{$_}));
608 122         264 $queue{$_} = $heap->{queue}->[0]->{$_};
609             }
610              
611             # Send data only if we are not shutting down...
612             # Set the child to 'active'
613 29         59 $heap->{active} = 1;
614              
615             # Put it in the wheel
616 29         172 $heap->{wheel}->put(\%queue);
617              
618 29         4439 return;
619             }
620              
621             sub print_queue {
622 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
623 0         0 return scalar @{$heap->{queue}};
  0         0  
624             }
625              
626             # This starts the EasyDBI
627             sub start {
628 2     2 0 1620 my ($kernel, $heap) = @_[KERNEL,HEAP];
629              
630             # Set up the alias for ourself
631 2 50       27 $kernel->alias_set($heap->{alias}) if ($heap->{alias} ne '');
632              
633             # Create the wheel
634 2         88 $kernel->call( $_[SESSION] => 'setup_wheel' );
635              
636 2         84 return;
637             }
638              
639             # This sets up the WHEEL
640             sub setup_wheel {
641 2     2 0 116 my ($kernel, $heap) = @_[KERNEL,HEAP];
642              
643             # Are we shutting down?
644 2 50       9 if ($heap->{shutdown}) {
645             # if ($heap->{wheel}) {
646             # $heap->{wheel}->kill();
647             # }
648             # Do not re-create the wheel...
649 0         0 delete $heap->{wheel};
650 0         0 return;
651             }
652              
653             # Check if we should set up the wheel
654 2 50       10 if ($heap->{retries} == $heap->{max_retries}) {
655             die 'POE::Component::EasyDBI tried '.$heap->{max_retries}
656 0         0 .' times to create a Wheel and is giving up...';
657             }
658              
659             my %prog = (
660             'Program' => \&POE::Component::EasyDBI::SubProcess::main,
661 2         11 'ProgramArgs' => [ $heap->{opts} ],
662             );
663              
664 2 50 33     35 if ($heap->{opts}{alt_fork} && $^O ne 'MSWin32') {
665 0 0       0 my $X = $heap->{opts}{alt_fork} ne "1" ? $heap->{opts}{alt_fork} : $^X;
666 0         0 %prog = (
667             'Program' => "$X -MPOE::Component::EasyDBI::SubProcess -I".join(' -I',@INC)
668             ." -e 'POE::Component::EasyDBI::SubProcess::main(1)'",
669             );
670             }
671              
672             # $kernel->sig_child( $heap->{wheel_pid} )
673             # if ( $heap->{wheel_pid} );
674              
675             # Set up the SubProcess we communicate with
676 2 50       39 $heap->{wheel} = POE::Wheel::Run->new(
677             # What we will run in the separate process
678             %prog,
679              
680             # Kill off existing FD's unless we're running in HELL^H^H^H^HMSWin32
681             'CloseOnCall' => ($^O eq 'MSWin32' ? 0 : 1),
682              
683             # Redirect errors to our error routine
684             'ErrorEvent' => 'child_error',
685              
686             # Send child died to our child routine
687             'CloseEvent' => 'child_closed',
688              
689             # Send input from child
690             'StdoutEvent' => 'child_STDOUT',
691              
692             # Send input from child STDERR
693             'StderrEvent' => 'child_STDERR',
694              
695             # Set our filters
696             # Communicate with child via Storable::nfreeze
697             'StdinFilter' => POE::Filter::Reference->new(),
698             # Receive input via Storable::nfreeze
699             'StdoutFilter' => POE::Filter::Reference->new(),
700             # Plain ol' error lines
701             'StderrFilter' => POE::Filter::Line->new(),
702             );
703              
704 2         13293 $heap->{wheel_pid} = $heap->{wheel}->PID();
705              
706 2 50       124 if ( $kernel->can( "sig_child" ) ) {
707 2         87 $kernel->sig_child( $heap->{wheel_pid} => 'sig_child' );
708             } else {
709 0         0 $kernel->sig( 'CHLD', 'sig_child' );
710             }
711              
712             # Check for errors
713 2 50       839 if (! defined $heap->{wheel}) {
714 0         0 die 'Unable to create a new wheel!';
715             } else {
716             # Increment our retry count
717 2         30 $heap->{retries}++;
718              
719             # Set the wheel to inactive
720 2         31 $heap->{active} = 0;
721              
722 2 50       39 if ($heap->{opts}{alt_fork}) {
723 0         0 $heap->{wheel}->put($heap->{opts});
724             }
725              
726             # Check for queries
727 2         90 $kernel->call($_[SESSION], 'check_queue');
728             }
729              
730 2         106 return;
731             }
732              
733       2 0   sub stop {
734             # nothing to see here, move along
735             }
736              
737             # Deletes a query from the queue, if it is not active
738             sub delete_query {
739 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
740             # ARG0 = ID
741 0         0 my $id = $_[ARG0];
742              
743             # Validation
744 0 0       0 if (!defined($id)) {
745             # Debugging
746 0         0 DEBUG && warn 'In Delete_Query event with no arguments!';
747 0         0 return;
748             }
749              
750             # Check if the id exists + not at the top of the queue :)
751 0 0       0 if (defined($heap->{queue}->[0])) {
752 0 0       0 if ($heap->{queue}->[0]->{id} eq $id) {
753             # Query is still active, nothing we can do...
754 0         0 return undef;
755             } else {
756             # Search through the rest of the queue and see what we get
757 0         0 foreach my $count (@{ $heap->{queue} }) {
  0         0  
758 0 0       0 if ($heap->{queue}->[$count]->{id} eq $id) {
759             # Found a match, delete it!
760 0         0 splice(@{ $heap->{queue} }, $count, 1);
  0         0  
761              
762             # Return success
763 0         0 return 1;
764             }
765             }
766             }
767             }
768              
769             # If we got here, we didn't find anything
770 0         0 return undef;
771             }
772              
773             # Handles child DIE'ing
774             sub child_closed {
775 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
776              
777 0         0 DEBUG && warn 'POE::Component::EasyDBI\'s Wheel closed';
778 0 0       0 if ($heap->{shutdown}) {
779             # if ($heap->{wheel}) {
780             # $heap->{wheel}->kill();
781             # }
782 0         0 delete $heap->{wheel};
783 0         0 return;
784             }
785              
786             # Emit debugging information
787 0         0 DEBUG && warn 'Restarting it...';
788              
789             # Create the wheel again
790 0         0 delete $heap->{wheel};
791 0         0 $kernel->call($_[SESSION], 'setup_wheel');
792              
793 0         0 return;
794             }
795              
796             # Handles child error
797             sub child_error {
798 2     2 0 1875 my $heap = $_[HEAP];
799              
800             # Emit warnings only if debug is on
801 2         5 DEBUG && do {
802             # Copied from POE::Wheel::Run manpage
803             my ($operation, $errnum, $errstr) = @_[ARG0 .. ARG2];
804             warn "POE::Component::EasyDBI got an $operation error $errnum from Subprocess: '$errstr' shutdown: $heap->{shutdown}\n";
805             };
806              
807 2 50       10 if ($heap->{shutdown}) {
808 2 50       9 if ($heap->{wheel}) {
809 2         12 $heap->{wheel}->kill();
810             }
811 2         123 delete $heap->{wheel};
812 2         1219 return;
813             }
814             }
815              
816             # Handles child STDOUT output
817             sub child_STDOUT {
818 29     29 0 17018400 my ($self, $kernel, $heap, $data) = @_[OBJECT,KERNEL,HEAP,ARG0];
819              
820             # Validate the argument
821 29 50       142 unless ( _HASH($data) ) {
822 0         0 warn "POE::Component::EasyDBI did not get a hash from the child ( $data )";
823 0         0 return;
824             }
825              
826              
827             # DEBUG && do {
828             # require Data::Dumper;
829             # print Data::Dumper->Dump([$data,$heap->{queue}[0]]);
830             # };
831              
832             # Check for special DB messages with ID of 'DBI'
833 29 50       162 if ($data->{id} eq 'DBI') {
834             # Okay, we received a DBI error -> error in connection...
835 0 0       0 if ($heap->{no_connect_failures}) {
836 0         0 my $qc = {};
837 0 0       0 if (defined($heap->{queue}->[0])) {
838 0         0 $qc = $heap->{queue}[0];
839             }
840 0         0 $qc->{error} = $data->{error};
841 0 0 0     0 if (_ARRAY($heap->{opts}{connect_error})) {
    0          
842 0         0 $kernel->post(@{$heap->{opts}{connect_error}}, $qc);
  0         0  
843 0 0       0 if (_HASH($heap->{queue}[0])) {
844 0         0 delete $heap->{queue}[0]->{error};
845             }
846             } elsif ($qc->{session} && $qc->{event}) {
847 0 0 0     0 if ( reftype( $qc->{event} ) && reftype( $qc->{event} ) eq 'CODE' ) {
848 0         0 my $callback = delete $qc->{event};
849 0         0 $callback->($qc);
850             } else {
851 0         0 $kernel->post($qc->{session}, $qc->{event}, $qc);
852             }
853             } else {
854 0         0 warn "No connect_error defined and no queries in the queue while "
855             ."error occurred: $data->{error}";
856             }
857             # print "DBI error: $data->{error}, retrying\n";
858 0         0 return;
859             }
860              
861             # Shutdown ourself!
862 0         0 $kernel->call($_[SESSION], 'shutdown', 'NOW');
863              
864             # Too bad that we have to die...
865 0         0 croak("Could not connect to DBI or database went away: $data->{error}");
866             }
867              
868 29 100       122 if ($data->{id} eq 'DBI-CONNECTED') {
869 2 50       67 if (_ARRAY($heap->{opts}{connected})) {
870 2         21 my $query_copy = {};
871 2 50       32 if (defined($heap->{queue}->[0])) {
872 0         0 $query_copy = { %{$heap->{queue}[0]} };
  0         0  
873             }
874 2         11 $kernel->post(@{$heap->{opts}{connected}}, $query_copy);
  2         40  
875             }
876 2         540 return;
877             }
878              
879 27         45 my $query;
880 27         44 my $refcount_decrement = 0;
881              
882 27 50       74 if (exists($data->{chunked})) {
883             # Get the query from the queue
884 0 0       0 if (exists($data->{last_chunk})) {
885             # last chunk, delete it out of the queue
886 0         0 $query = shift(@{ $heap->{queue} });
  0         0  
887 0         0 $refcount_decrement = 1;
888             } else {
889 0         0 $query = $heap->{queue}->[0];
890             }
891             } else {
892             # Check to see if the ID matches with the top of the queue
893 27 50       109 if ($data->{id} ne $heap->{queue}->[0]->{id}) {
894 0         0 die "Internal error in queue/child consistency! ( CHILD: $data->{id} "
895             ."QUEUE: $heap->{queue}->[0]->{id} )";
896             }
897             # Get the query from the top of the queue
898 27         53 $query = shift(@{ $heap->{queue} });
  27         78  
899 27         47 $refcount_decrement = 1;
900             }
901              
902             # copy the query data, so we don't clobber the
903             # stored data when using chunks
904             #my $query_copy = { %{ $query } };
905              
906             # marry data from the child to the data from the queue
907             #%$query_copy = (%$query_copy, %$data);
908              
909             # my $query_copy = { (%$query, %$data) };
910              
911             # my $query_copy = $query;
912             # foreach (keys %$data) { $query_copy->{$_} = $data->{$_}; }
913              
914 27         298 my $query_copy = { %$query, %$data };
915              
916             # undocumented
917             $poe_kernel->call( $self->{query_logger} => _log => $query_copy )
918 27 50       406 if ( $self->{query_logger} );
919              
920             # my ($ses,$evt) = ("$query_copy->{session}", "$query_copy->{event}");
921              
922             # $kernel->call($ses => $evt => $query_copy);
923              
924             #undef $query;
925             #foreach my $k (keys %$data) {
926             # $query_copy->{$k} = $data->{$k};
927             #}
928              
929 27 100 66     187 if ( reftype( $query_copy->{event} ) && reftype( $query_copy->{event} ) eq 'CODE' ) {
930 7         17 DEBUG && print "calling callback\n";
931 7         22 my $callback = delete $query_copy->{event};
932 7         41 $callback->($query_copy);
933             } else {
934             #DEBUG && print "calling event $query->{event} in session $query->{session} from our session ".$_[SESSION]->ID."\n";
935 20         69 $kernel->call($query_copy->{session} => $query_copy->{event} => $query_copy);
936             }
937              
938             # Decrement the refcount for the session that sent us a query
939 27 50       12703 if ($refcount_decrement) {
940 27         67 $heap->{active} = 0;
941 27         93 $kernel->refcount_decrement($query_copy->{session}, 'EasyDBI');
942              
943 27 100 66     1351 if (defined($query_copy->{sendersession}) && $query_copy->{sendersession} ne $query_copy->{session}) {
944 4         19 $kernel->refcount_decrement($query_copy->{sendersession}, 'EasyDBI');
945             }
946              
947             # Now, that we have got a result, check if we need to send another query
948 27         186 $kernel->call($_[SESSION], 'check_queue');
949             }
950              
951 27         370 return;
952             }
953              
954             # Handles child STDERR output
955             sub child_STDERR {
956 0     0 0 0 my $input = $_[ARG0];
957              
958             # Skip empty lines as the POE::Filter::Line manpage says...
959 0 0       0 if ($input eq '') { return }
  0         0  
960              
961 0 0 0     0 return if ($_[HEAP]->{opts}->{no_warnings} && !DEBUG);
962              
963 0         0 warn "$input\n";
964             }
965              
966             sub sig_child {
967 2     2 0 5516 my ($kernel, $heap) = @_[KERNEL, HEAP];
968              
969 2         6 delete $heap->{wheel_pid};
970 2         18 $kernel->sig_handled();
971              
972 2 50       35 if ( $heap->{shutdown} ) {
973             # Remove our alias so we can be properly terminated
974 2 50       13 $kernel->alias_remove($heap->{alias}) if ($heap->{alias} ne '');
975             # and the child
976 2         119 $kernel->sig( 'CHLD' );
977             }
978             }
979              
980             # ----------------
981             # Object methods
982              
983             sub ID {
984 1     1 1 4 my $self = shift;
985 1         48 return $self->{ID};
986             }
987              
988             # not called directly
989             sub DESTROY {
990 1     1   306 my $self = shift;
991 1 50 33     12 if (ref($self) && $self->ID) {
992             # $poe_kernel->post($self->ID => 'shutdown' => @_);
993             } else {
994 0           return undef;
995             }
996             }
997              
998             # End of module
999             1;
1000              
1001             # egg: I like Sealab 2021. This is why you can see lots of 2021 refences in my code.
1002              
1003             #=item C
1004             #
1005             #Optional. If set to a true value, it will install a signal handler that will
1006             #call $sth->cancel when a INT signal is received by the sub-process. You may
1007             #want to read the docs on your driver to see if this is supported.
1008              
1009             __END__