File Coverage

blib/lib/POE/Component/EasyDBI.pm
Criterion Covered Total %
statement 232 390 59.4
branch 88 204 43.1
condition 20 75 26.6
subroutine 30 35 85.7
pod 1 19 5.2
total 371 723 51.3


line stmt bran cond sub pod time code
1             package POE::Component::EasyDBI;
2              
3 2     2   255065 use strict;
  2         5  
  2         66  
4 2     2   8 use warnings FATAL =>'all';
  2         1  
  2         89  
5              
6             # Initialize our version
7             our $VERSION = '1.28';
8              
9             # Import what we need from the POE namespace
10 2     2   7 use POE;
  2         2  
  2         11  
11 2     2   501 use POE::Session;
  2         3  
  2         8  
12 2     2   1016 use POE::Filter::Reference;
  2         11666  
  2         47  
13 2     2   950 use POE::Filter::Line;
  2         2292  
  2         48  
14 2     2   1253 use POE::Wheel::Run;
  2         27208  
  2         59  
15 2     2   1082 use POE::Component::EasyDBI::SubProcess;
  2         5  
  2         78  
16              
17 2     2   1094 use Params::Util qw( _ARRAY _HASH );
  2         4079  
  2         133  
18 2     2   10 use Scalar::Util qw( reftype );
  2         3  
  2         63  
19              
20              
21             # Miscellaneous modules
22 2     2   8 use Carp;
  2         2  
  2         82  
23 2     2   7 use vars qw($AUTOLOAD);
  2         3  
  2         6752  
24              
25             our %COMMANDS = map { $_ => 1 } qw(
26             commit
27             rollback
28             begin_work
29             func
30             method
31             insert
32             do
33             single
34             quote
35             arrayhash
36             hashhash
37             hasharray
38             array
39             arrayarray
40             hash
41             keyvalhash
42             shutdown
43             combo
44             );
45              
46             sub AUTOLOAD {
47 13     13   4496 my $self = shift;
48 13         23 my $method = $AUTOLOAD;
49 13         83 $method =~ s/.*:://;
50 13 50       39 my $call = ( $method =~ s/^_// ) ? 1 : 0;
51            
52 13 50       53 return unless $method =~ /[^A-Z]/;
53            
54 13         21 $method = lc( $method );
55            
56 13 50       38 croak "EasyDBI command $method does not exist"
57             unless ( exists( $COMMANDS{ $method } ) );
58              
59 13 100       123 my @args = ( $method eq 'shutdown' ) ? @_ : { @_ };
60 13 50       25 if ( $call ) {
61 0         0 $poe_kernel->call($self->{ID} => $method => @args);
62             } else {
63 13         46 $poe_kernel->post($self->{ID} => $method => @args);
64             }
65             }
66              
67             # TODO use constants?
68             sub MAX_RETRIES () { 5 }
69             sub DEBUG () { 0 }
70              
71             # Autoflush on STDOUT
72             # Select returns last selected handle
73             # So, reselect it after selecting STDOUT and setting Autoflush
74             #select((select(STDOUT), $| = 1)[0]);
75              
76             sub spawn {
77 1     1 0 1370 my ($self,$session) = &new;
78 1         3 return $session;
79             }
80              
81             sub create {
82 0     0 0 0 &spawn;
83             }
84              
85             sub new {
86 2     2 0 1417 my $class = shift;
87              
88             # The options hash
89 2         4 my %opt;
90              
91             # Support passing in a hash ref or a regular hash
92 2 50 33     11 if ((@_ & 1) && _HASH($_[0])) {
93 0         0 %opt = %{$_[0]};
  0         0  
94             } else {
95             # Sanity checking
96 2 50       8 if (@_ & 1) {
97 0         0 croak('POE::Component::EasyDBI requires an even number of options '
98             .'passed to new()/spawn() call');
99             }
100 2         23 %opt = @_;
101             }
102              
103             # lowercase keys
104 2         8 %opt = map { lc($_) => $opt{$_} } keys %opt;
  16         31  
105            
106 2         17 my @valid = qw(
107             dsn
108             username
109             password
110             options
111             alias
112             max_retries
113             ping_timeout
114             use_cancel
115             no_connect_failures
116             connect_error
117             reconnect_wait
118             connected
119             no_warnings
120             sig_ignore_off
121             no_cache
122             alt_fork
123             stopwatch
124             query_logger
125             );
126            
127             # check the DSN
128             # username/password/port other options
129             # should be part of the DSN
130 2 50       13 if (!exists($opt{dsn})) {
131 0         0 croak('DSN is required to spawn a new POE::Component::EasyDBI '
132             .'instance!');
133             }
134              
135             # check the USERNAME
136 2 50       6 if (!exists($opt{username})) {
137 0         0 croak('username is required to spawn a new POE::Component::EasyDBI '
138             .'instance!');
139             }
140              
141             # check the PASSWORD
142 2 50       6 if (!exists($opt{password})) {
143 0         0 croak('password is required to spawn a new POE::Component::EasyDBI '
144             .'instance!');
145             }
146              
147             # check the reconnect wait time
148 2 50 33     12 if (exists($opt{reconnect_wait}) && $opt{reconnect_wait} < 1) {
    50          
149 0         0 warn('A reconnect_wait of less than 1 second is NOT recommended, '
150             .'continuing anyway');
151             } elsif (!$opt{reconnect_wait}) {
152 2         5 $opt{reconnect_wait} = 2;
153             }
154              
155             # check the session alias
156 2 50       6 if (!exists($opt{alias})) {
157             # Debugging info...
158 0         0 DEBUG && warn 'Using default Alias EasyDBI';
159              
160             # Set the default
161 0         0 $opt{alias} = 'EasyDBI';
162             }
163            
164             # check for connect error event
165 2 50       6 if (exists($opt{connect_error})) {
166 2 50       13 if (_ARRAY($opt{connect_error})) {
167 2 50       2 unless ($#{$opt{connect_error}} > 0) {
  2         9  
168 0         0 warn('connect_error must be an array reference that contains '
169             .'at least a session and event, ignoring');
170 0         0 delete $opt{connect_error};
171             }
172             } else {
173 0         0 warn('connect_error must be an array reference that contains at '
174             .'least a session and event, ignoring');
175 0         0 delete $opt{connect_error};
176             }
177             }
178            
179             # check for connect event
180 2 50       5 if (exists($opt{connected})) {
181 2 50       7 if (_ARRAY($opt{connected})) {
182 2 50       2 unless ($#{$opt{connected}} > 0) {
  2         8  
183 0         0 warn('connected must be an array reference that contains '
184             .'at least a session and event, ignoring');
185 0         0 delete $opt{connected};
186             }
187             } else {
188 0         0 warn('connected must be an array reference that contains at '
189             .'least a session and event, ignoring');
190 0         0 delete $opt{connected};
191             }
192             }
193              
194 2 50       6 if (exists($opt{options})) {
195 0 0       0 unless (_HASH($opt{options})) {
196 0         0 warn('options must be a hash ref, ignoring');
197 0         0 delete $opt{options};
198             }
199             }
200              
201 2 50       6 if ($opt{stopwatch}) {
202 2     2   12 eval "use Time::Stopwatch";
  2         3  
  2         17  
  2         105  
203 2 50       7 if ($@) {
204 0         0 warn('cannot use stopwatch (ignored), Time::Stopwatch not installed? ');
205 0         0 delete $opt{stopwatch};
206             }
207             }
208            
209 2         4 my $keep = { map { $_ => delete $opt{$_} } @valid };
  36         52  
210              
211             # Anything left over is unrecognized
212 2 50       9 if (keys %opt) {
213 0         0 croak('Unrecognized keys/options ('.join(',',(keys %opt))
214             .') were present in new() call to POE::Component::EasyDBI!');
215             }
216            
217 2   33     14 my $self = bless($keep,ref $class || $class);
218              
219             # Create a new session for ourself
220 36         194 my $session = POE::Session->create(
221             # Our subroutines
222             'object_states' => [
223             $self => {
224             # Maintenance events
225             '_start' => 'start',
226             '_stop' => 'stop',
227             'setup_wheel' => 'setup_wheel',
228             'sig_child' => 'sig_child',
229            
230             # child events
231             'child_error' => 'child_error',
232             'child_closed' => 'child_closed',
233             'child_STDOUT' => 'child_STDOUT',
234             'child_STDERR' => 'child_STDERR',
235            
236             # database events
237 2 50 50     17 (map { $_ => 'db_handler', uc($_) => 'db_handler' } keys %COMMANDS),
      50        
238            
239             # redefine
240             'combo' => 'combo_query',
241             'COMBO' => 'combo_query',
242             'shutdown' => 'shutdown_poco',
243             'SHUTDOWN' => 'shutdown_poco',
244            
245            
246             # Queue handling
247             'send_query' => 'send_query',
248             'check_queue' => 'check_queue',
249             'print_queue' => 'print_queue',
250             },
251             ],
252              
253             # Set up the heap for ourself
254             'heap' => {
255             # The queue of DBI calls
256             'queue' => [],
257             'idcounter' => 0,
258              
259             # The Wheel::Run object placeholder
260             'wheel' => undef,
261              
262             # How many times have we re-created the wheel?
263             'retries' => 0,
264              
265             # Are we shutting down?
266             'shutdown' => 0,
267              
268             # Valid options
269             'opts' => $keep,
270              
271             # The alia/s we will run under
272             'alias' => $keep->{alias},
273              
274             # Number of times to retry connection
275             # (if connection failures option is off)
276             'max_retries' => $keep->{max_retries} || MAX_RETRIES,
277              
278             # Connection failure option
279             'no_connect_failures' => $keep->{no_connect_failures} || 0,
280              
281             # extra params for actions
282             action_params => {
283             commit => [],
284             rollback => [],
285             begin_work => [],
286             func => [qw( args )],
287             method => [qw( function args )],
288             single => [qw( separator )],
289             insert => [qw( insert hash table last_insert_id )],
290             array => [qw( chunked separator )],
291             arrayarray => [qw( chunked )],
292             keyvalhash => [qw( primary_key chunked )],
293             hashhash => [qw( primary_key chunked )],
294             hasharray => [qw( primary_key chunked )],
295             arrayhash => [qw( chunked )],
296             },
297             },
298             ) or die 'Unable to create a new session!';
299            
300             # save the session id
301 2         604 $self->{ID} = $session->ID;
302            
303 2 100       93 return wantarray ? ($self,$session) : $self;
304             }
305              
306             # This subroutine handles shutdown signals
307             sub shutdown_poco {
308 2     2 0 485 my ($kernel, $heap) = @_[KERNEL,HEAP];
309            
310             # Check for duplicate shutdown signals
311 2 50       20 if ($heap->{shutdown}) {
312             # Okay, let's see what's going on
313 0 0 0     0 if ($heap->{shutdown} == 1 && ! defined $_[ARG0]) {
    0          
314             # Duplicate shutdown events
315 0         0 DEBUG && warn 'Duplicate shutdown event fired!';
316 0         0 return;
317             } elsif ($heap->{shutdown} == 2) {
318             # Tried to shutdown_NOW again...
319 0         0 DEBUG && warn 'Duplicate shutdown NOW fired!';
320 0         0 return;
321             }
322             }
323              
324             # Check if we got "NOW"
325 2 50 33     11 if (defined($_[ARG0]) && uc($_[ARG0]) eq 'NOW') {
326             # Actually shut down!
327 0         0 $heap->{shutdown} = 2;
328              
329 0 0       0 if ($heap->{wheel}) {
330             # KILL our subprocess
331 0         0 $heap->{wheel}->kill(9);
332             }
333              
334             # Delete the wheel, so we have nothing to keep
335             # the GC from destructing us...
336 0         0 delete $heap->{wheel};
337              
338             # Go over our queue, and do some stuff
339 0         0 foreach my $queue (@{$heap->{queue}}) {
  0         0  
340             # Skip the special EXIT actions we might have put on the queue
341 0 0       0 if ($queue->{action} eq 'EXIT') { next }
  0         0  
342              
343             # Post a failure event to all the queries on the Queue
344             # informing them that we have been shutdown...
345 0         0 $queue->{error} = 'POE::Component::EasyDBI was '
346             .'shut down forcibly!';
347 0         0 $kernel->post( $queue->{session}, $queue->{event}, $queue);
348              
349             # Argh, decrement the refcount
350 0         0 $kernel->refcount_decrement( $queue->{session}, 'EasyDBI' );
351             }
352              
353             # Tell the kernel to kill us!
354 0         0 $kernel->signal( $_[SESSION], 'KILL' );
355             } else {
356             # Gracefully shut down...
357 2         4 $heap->{shutdown} = 1;
358            
359             # Put into the queue EXIT for the child
360 2         12 $kernel->yield( 'send_query', {
361             action => 'EXIT',
362             sql => undef,
363             placeholders => undef,
364             }
365             );
366             }
367             }
368              
369             sub combo_query {
370 2     2 0 194 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
371              
372             # Get the arguments
373 2 50       8 unless (_HASH($args)) {
374 0         0 $args = { @_[ ARG0 .. $#_ ] };
375             }
376            
377             # Add some stuff to the args
378             # defaults to sender, but can be specified
379 2 50       8 unless (defined($args->{session})) {
380 2         6 $args->{session} = $_[SENDER]->ID();
381 2         7 DEBUG && print "setting session to $args->{session}\n";
382             }
383            
384 2         4 $args->{action} = $_[STATE];
385              
386 2 50       9 if (!exists($args->{event})) {
387             # Nothing much we can do except drop this quietly...
388 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
389             ." State: " . $_[STATE] . " Args: " . join('',%$args);
390 0         0 return;
391             } else {
392 2         4 my $a = ref($args->{event});
393 2 0 33     8 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
394 0         0 warn "Received an malformed event argument from caller"
395             ." (only postbacks, callbacks and scalar allowed) "
396             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
397             ." Args: " . %$args;
398 0         0 return;
399             }
400             }
401              
402 2         3 my @res;
403             my $handle = sub {
404 4     4   6 push(@res, shift);
405 4 100       20 $poe_kernel->post( $args->{session} => $args->{event} => @res ) if (delete $res[ -1 ]->{__last});
406 2         28 };
407              
408 2         5 foreach my $i ( 0 .. $#{ $args->{queries} } ) {
  2         10  
409 4         9 my ($type, $arg) = %{ $args->{queries}->[ $i ] };
  4         12  
410            
411             # Copy pass-through options
412 4         5 for my $key ( keys %{ $args } ) {
  4         9  
413 16 100 66     63 next if defined $arg->{$key} || $key eq 'queries';
414 12         16 $arg->{$key} = $args->{$key};
415             }
416            
417 4         9 $arg->{event} = $handle;
418 4         4 $arg->{__last} = ( $i == $#{ $args->{queries} } );
  4         13  
419 4         10 $kernel->call( $_[SESSION] => $type => $arg );
420             }
421              
422 2         16 return;
423             }
424              
425             # This subroutine handles queries
426             sub db_handler {
427 27     27 0 3627 my ($kernel, $heap) = @_[KERNEL,HEAP];
428              
429             # Get the arguments
430 27         29 my $args;
431 27 50       78 if (_HASH($_[ARG0])) {
432 27         34 $args = $_[ARG0];
433             } else {
434 0         0 warn "first parameter must be a hash ref, trying to adjust. "
435             ."(fix this to get rid of this message)";
436 0         0 $args = { @_[ARG0 .. $#_ ] };
437             }
438              
439             # fix a stupid spelling mistake
440 27 50       63 if ($args->{seperator}) {
441 0         0 $args->{separator} = $args->{seperator};
442             }
443            
444             # Add some stuff to the args
445             # defaults to sender, but can be specified
446 27 100       55 unless (defined($args->{session})) {
447 23         47 $args->{session} = $_[SENDER]->ID();
448 23         68 DEBUG && print "setting session to $args->{session}\n";
449             }
450            
451 27         39 $args->{action} = $_[STATE];
452              
453 27 50       57 if (!exists($args->{event})) {
454             # Nothing much we can do except drop this quietly...
455 0         0 warn "Did not receive an event argument from caller ".$_[SENDER]->ID
456             ." State: " . $_[STATE] . " Args: " . join(',',%$args);
457 0         0 return;
458             } else {
459 27         46 my $a = ref($args->{event});
460 27 0 33     67 unless (!ref($a) || $a =~ m/postback/i || $a =~ m/callback/i) {
      33        
461 0         0 warn "Received an malformed event argument from caller"
462             ." (only postbacks, callbacks and scalar allowed) "
463             .$_[SENDER]->ID." -> State: " . $_[STATE] . " Event: $args->{event}"
464             ." Args: " . %$args;
465 0         0 return;
466             }
467             }
468              
469 27 100       72 if (!defined($args->{sql})) {
470 6 50       64 unless ($args->{action} =~ m/^(insert|func|method|commit|rollback|begin_work)$/i) {
471 0         0 $args->{error} = 'sql is not defined!';
472             # Okay, send the error to the Failure Event
473 0         0 $kernel->post($args->{session}, $args->{event}, $args);
474 0         0 return;
475             }
476             } else {
477 21 50       47 if (ref($args->{sql})) {
478 0         0 $args->{error} = 'sql is not a scalar!';
479 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
480 0         0 my $callback = delete $args->{event};
481 0         0 $callback->($args);
482             } else {
483             # send the error to the Failure Event
484 0         0 $kernel->post($args->{session}, $args->{event}, $args);
485             }
486 0         0 return;
487             }
488             }
489              
490             # Check for placeholders
491 27 100       52 if (!defined($args->{placeholders})) {
492             # Create our own empty placeholders
493 23         49 $args->{placeholders} = [];
494             } else {
495 4 50       16 unless (_ARRAY($args->{placeholders})) {
496 0         0 $args->{error} = 'placeholders is not an array!';
497 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
498 0         0 my $callback = delete $args->{event};
499 0         0 $callback->($args);
500             } else {
501             # Okay, send the error to the Failure Event
502 0         0 $kernel->post($args->{session}, $args->{event}, $args);
503             }
504 0         0 return;
505             }
506             }
507              
508             # Check for primary_key on HASHHASH or ARRAYHASH queries
509 27 50 33     132 if ($args->{action} eq 'HASHHASH' || $args->{action} eq 'HASHARRAY') {
510 0 0       0 if (!defined($args->{primary_key})) {
511 0         0 $args->{error} = 'primary_key is not defined! It must '
512             .'be a column name or a 1 based index of a column';
513 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
514 0         0 my $callback = delete $args->{event};
515 0         0 $callback->($args);
516             } else {
517 0         0 $kernel->post($args->{session}, $args->{event}, $args);
518             }
519 0         0 return;
520             } else {
521 0         0 $args->{error} = 'primary_key is not a scalar!';
522 0 0       0 if (ref($args->{sql})) {
523             # send the error to the Failure Event
524 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
525 0         0 my $callback = delete $args->{event};
526 0         0 $callback->($args);
527             } else {
528 0         0 $kernel->post($args->{session}, $args->{event}, $args);
529             }
530 0         0 return;
531             }
532             }
533             }
534              
535             # Check if we have shutdown or not
536 27 50       50 if ($heap->{shutdown}) {
537 0         0 $args->{error} = 'POE::Component::EasyDBI is shutting'
538             .' down now, requests are not accepted!';
539             # Do not accept this query
540 0 0 0     0 if ( reftype( $args->{event} ) && reftype( $args->{event} ) eq 'CODE' ) {
541 0         0 my $callback = delete $args->{event};
542 0         0 $callback->($args);
543             } else {
544 0         0 $kernel->post($args->{session}, $args->{event}, $args);
545             }
546 0         0 return;
547             }
548              
549             # Increment the refcount for the session that is sending us this query
550 27         74 $kernel->refcount_increment($args->{session}, 'EasyDBI');
551            
552 27 100       661 if ($args->{session} ne $_[SENDER]->ID()) {
553 4         30 $kernel->refcount_increment($_[SENDER]->ID(), 'EasyDBI');
554 4         75 $args->{sendersession} = $_[SENDER]->ID();
555             }
556              
557             # Okay, fire off this query!
558 27         129 $kernel->call($_[SESSION] => 'send_query' => $args);
559            
560 27         120 return;
561             }
562              
563             # This subroutine starts the process of sending a query
564             sub send_query {
565 29     29 0 930 my ($kernel, $heap, $args) = @_[KERNEL,HEAP,ARG0];
566            
567             # Validate that we have something
568 29 50 33     124 if (!defined($args) || !_HASH($args) ) {
569 0         0 return;
570             }
571              
572             # Add the ID to the query
573 29         55 $args->{id} = $heap->{idcounter}++;
574              
575             # Add this query to the queue
576 29         33 push(@{ $heap->{queue} }, $args);
  29         48  
577              
578             # Send the query!
579 29         76 $kernel->call($_[SESSION], 'check_queue');
580            
581 29         143 return;
582             }
583              
584             # This subroutine is the meat - sends queries to the subprocess
585             sub check_queue {
586 58     58 0 1407 my ($kernel, $heap) = @_[KERNEL,HEAP];
587            
588             # Check if the subprocess is currently active
589 58 100       121 return unless (!$heap->{active});
590              
591             # Check if we have a query in the queue
592 56 100       45 return unless (scalar(@{ $heap->{queue} }) > 0);
  56         156  
593            
594             # shutting down?
595 29 50       62 return unless ($heap->{shutdown} != 2);
596            
597 29 50       62 if ($heap->{opts}{stopwatch}) {
598 29         157 tie $heap->{queue}->[0]->{stopwatch}, 'Time::Stopwatch';
599             }
600              
601             # Copy what we need from the top of the queue
602 29         211 my %queue;
603 29         37 foreach (
604 29         104 qw(id sql action placeholders no_cache begin_work commit method)
605             ,@{$heap->{action_params}->{$heap->{queue}->[0]->{action}}}
606             ) {
607 266 100       456 next unless (defined($heap->{queue}->[0]->{$_}));
608 122         182 $queue{$_} = $heap->{queue}->[0]->{$_};
609             }
610              
611             # Send data only if we are not shutting down...
612             # Set the child to 'active'
613 29         44 $heap->{active} = 1;
614              
615             # Put it in the wheel
616 29         97 $heap->{wheel}->put(\%queue);
617              
618 29         2551 return;
619             }
620              
621             sub print_queue {
622 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
623 0         0 return scalar @{$heap->{queue}};
  0         0  
624             }
625              
626             # This starts the EasyDBI
627             sub start {
628 2     2 0 895 my ($kernel, $heap) = @_[KERNEL,HEAP];
629            
630             # Set up the alias for ourself
631 2 50       11 $kernel->alias_set($heap->{alias}) if ($heap->{alias} ne '');
632            
633             # Create the wheel
634 2         50 $kernel->call( $_[SESSION] => 'setup_wheel' );
635            
636 2         36 return;
637             }
638              
639             # This sets up the WHEEL
640             sub setup_wheel {
641 2     2 0 88 my ($kernel, $heap) = @_[KERNEL,HEAP];
642            
643             # Are we shutting down?
644 2 50       6 if ($heap->{shutdown}) {
645             # if ($heap->{wheel}) {
646             # $heap->{wheel}->kill();
647             # }
648             # Do not re-create the wheel...
649 0         0 delete $heap->{wheel};
650 0         0 return;
651             }
652              
653             # Check if we should set up the wheel
654 2 50       7 if ($heap->{retries} == $heap->{max_retries}) {
655 0         0 die 'POE::Component::EasyDBI tried '.$heap->{max_retries}
656             .' times to create a Wheel and is giving up...';
657             }
658              
659 2         12 my %prog = (
660             'Program' => \&POE::Component::EasyDBI::SubProcess::main,
661             'ProgramArgs' => [ $heap->{opts} ],
662             );
663              
664 2 50 33     9 if ($heap->{opts}{alt_fork} && $^O ne 'MSWin32') {
665 0 0       0 my $X = $heap->{opts}{alt_fork} ne "1" ? $heap->{opts}{alt_fork} : $^X;
666 0         0 %prog = (
667             'Program' => "$X -MPOE::Component::EasyDBI::SubProcess -I".join(' -I',@INC)
668             ." -e 'POE::Component::EasyDBI::SubProcess::main(1)'",
669             );
670             }
671              
672             # $kernel->sig_child( $heap->{wheel_pid} )
673             # if ( $heap->{wheel_pid} );
674            
675             # Set up the SubProcess we communicate with
676 2 50       28 $heap->{wheel} = POE::Wheel::Run->new(
677             # What we will run in the separate process
678             %prog,
679              
680             # Kill off existing FD's unless we're running in HELL^H^H^H^HMSWin32
681             'CloseOnCall' => ($^O eq 'MSWin32' ? 0 : 1),
682              
683             # Redirect errors to our error routine
684             'ErrorEvent' => 'child_error',
685              
686             # Send child died to our child routine
687             'CloseEvent' => 'child_closed',
688              
689             # Send input from child
690             'StdoutEvent' => 'child_STDOUT',
691              
692             # Send input from child STDERR
693             'StderrEvent' => 'child_STDERR',
694              
695             # Set our filters
696             # Communicate with child via Storable::nfreeze
697             'StdinFilter' => POE::Filter::Reference->new(),
698             # Receive input via Storable::nfreeze
699             'StdoutFilter' => POE::Filter::Reference->new(),
700             # Plain ol' error lines
701             'StderrFilter' => POE::Filter::Line->new(),
702             );
703              
704 2         6921 $heap->{wheel_pid} = $heap->{wheel}->PID();
705            
706 2 50       68 if ( $kernel->can( "sig_child" ) ) {
707 2         40 $kernel->sig_child( $heap->{wheel_pid} => 'sig_child' );
708             } else {
709 0         0 $kernel->sig( 'CHLD', 'sig_child' );
710             }
711              
712             # Check for errors
713 2 50       447 if (! defined $heap->{wheel}) {
714 0         0 die 'Unable to create a new wheel!';
715             } else {
716             # Increment our retry count
717 2         13 $heap->{retries}++;
718              
719             # Set the wheel to inactive
720 2         21 $heap->{active} = 0;
721              
722 2 50       10 if ($heap->{opts}{alt_fork}) {
723 0         0 $heap->{wheel}->put($heap->{opts});
724             }
725              
726             # Check for queries
727 2         37 $kernel->call($_[SESSION], 'check_queue');
728             }
729            
730 2         51 return;
731             }
732              
733 2     2 0 619 sub stop {
734             # nothing to see here, move along
735             }
736              
737             # Deletes a query from the queue, if it is not active
738             sub delete_query {
739 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
740             # ARG0 = ID
741 0         0 my $id = $_[ARG0];
742              
743             # Validation
744 0 0       0 if (!defined($id)) {
745             # Debugging
746 0         0 DEBUG && warn 'In Delete_Query event with no arguments!';
747 0         0 return;
748             }
749              
750             # Check if the id exists + not at the top of the queue :)
751 0 0       0 if (defined($heap->{queue}->[0])) {
752 0 0       0 if ($heap->{queue}->[0]->{id} eq $id) {
753             # Query is still active, nothing we can do...
754 0         0 return undef;
755             } else {
756             # Search through the rest of the queue and see what we get
757 0         0 foreach my $count (@{ $heap->{queue} }) {
  0         0  
758 0 0       0 if ($heap->{queue}->[$count]->{id} eq $id) {
759             # Found a match, delete it!
760 0         0 splice(@{ $heap->{queue} }, $count, 1);
  0         0  
761              
762             # Return success
763 0         0 return 1;
764             }
765             }
766             }
767             }
768              
769             # If we got here, we didn't find anything
770 0         0 return undef;
771             }
772              
773             # Handles child DIE'ing
774             sub child_closed {
775 0     0 0 0 my ($kernel, $heap) = @_[KERNEL,HEAP];
776            
777 0         0 DEBUG && warn 'POE::Component::EasyDBI\'s Wheel closed';
778 0 0       0 if ($heap->{shutdown}) {
779             # if ($heap->{wheel}) {
780             # $heap->{wheel}->kill();
781             # }
782 0         0 delete $heap->{wheel};
783 0         0 return;
784             }
785              
786             # Emit debugging information
787 0         0 DEBUG && warn 'Restarting it...';
788              
789             # Create the wheel again
790 0         0 delete $heap->{wheel};
791 0         0 $kernel->call($_[SESSION], 'setup_wheel');
792            
793 0         0 return;
794             }
795              
796             # Handles child error
797             sub child_error {
798 2     2 0 2912 my $heap = $_[HEAP];
799            
800             # Emit warnings only if debug is on
801 2         4 DEBUG && do {
802             # Copied from POE::Wheel::Run manpage
803             my ($operation, $errnum, $errstr) = @_[ARG0 .. ARG2];
804             warn "POE::Component::EasyDBI got an $operation error $errnum from Subprocess: '$errstr' shutdown: $heap->{shutdown}\n";
805             };
806            
807 2 50       10 if ($heap->{shutdown}) {
808 2 50       8 if ($heap->{wheel}) {
809 2         59 $heap->{wheel}->kill();
810             }
811 2         95 delete $heap->{wheel};
812 2         1016 return;
813             }
814             }
815              
816             # Handles child STDOUT output
817             sub child_STDOUT {
818 29     29 0 2470854 my ($self, $kernel, $heap, $data) = @_[OBJECT,KERNEL,HEAP,ARG0];
819            
820             # Validate the argument
821 29 50       105 unless ( _HASH($data) ) {
822 0         0 warn "POE::Component::EasyDBI did not get a hash from the child ( $data )";
823 0         0 return;
824             }
825              
826              
827             # DEBUG && do {
828             # require Data::Dumper;
829             # print Data::Dumper->Dump([$data,$heap->{queue}[0]]);
830             # };
831              
832             # Check for special DB messages with ID of 'DBI'
833 29 50       74 if ($data->{id} eq 'DBI') {
834             # Okay, we received a DBI error -> error in connection...
835              
836 0 0       0 if ($heap->{no_connect_failures}) {
837 0         0 my $qc = {};
838 0 0       0 if (defined($heap->{queue}->[0])) {
839 0         0 $qc = $heap->{queue}[0];
840             }
841 0         0 $qc->{error} = $data->{error};
842 0 0 0     0 if (_ARRAY($heap->{opts}{connect_error})) {
    0          
843 0         0 $kernel->post(@{$heap->{opts}{connect_error}}, $qc);
  0         0  
844 0         0 delete $heap->{queue}[0]->{error};
845             } elsif ($qc->{session} && $qc->{event}) {
846 0 0 0     0 if ( reftype( $qc->{event} ) && reftype( $qc->{event} ) eq 'CODE' ) {
847 0         0 my $callback = delete $qc->{event};
848 0         0 $callback->($qc);
849             } else {
850 0         0 $kernel->post($qc->{session}, $qc->{event}, $qc);
851             }
852             } else {
853 0         0 warn "No connect_error defined and no queries in the queue while "
854             ."error occurred: $data->{error}";
855             }
856             # print "DBI error: $data->{error}, retrying\n";
857 0         0 return;
858             }
859            
860             # Shutdown ourself!
861 0         0 $kernel->call($_[SESSION], 'shutdown', 'NOW');
862              
863             # Too bad that we have to die...
864 0         0 croak("Could not connect to DBI or database went away: $data->{error}");
865             }
866              
867 29 100       56 if ($data->{id} eq 'DBI-CONNECTED') {
868 2 50       15 if (_ARRAY($heap->{opts}{connected})) {
869 2         5 my $query_copy = {};
870 2 50       9 if (defined($heap->{queue}->[0])) {
871 0         0 $query_copy = { %{$heap->{queue}[0]} };
  0         0  
872             }
873 2         8 $kernel->post(@{$heap->{opts}{connected}}, $query_copy);
  2         16  
874             }
875 2         253 return;
876             }
877              
878 27         22 my $query;
879 27         27 my $refcount_decrement = 0;
880            
881 27 50       65 if (exists($data->{chunked})) {
882             # Get the query from the queue
883 0 0       0 if (exists($data->{last_chunk})) {
884             # last chunk, delete it out of the queue
885 0         0 $query = shift(@{ $heap->{queue} });
  0         0  
886 0         0 $refcount_decrement = 1;
887             } else {
888 0         0 $query = $heap->{queue}->[0];
889             }
890             } else {
891             # Check to see if the ID matches with the top of the queue
892 27 50       118 if ($data->{id} ne $heap->{queue}->[0]->{id}) {
893 0         0 die "Internal error in queue/child consistency! ( CHILD: $data->{id} "
894             ."QUEUE: $heap->{queue}->[0]->{id} )";
895             }
896             # Get the query from the top of the queue
897 27         25 $query = shift(@{ $heap->{queue} });
  27         42  
898 27         34 $refcount_decrement = 1;
899             }
900              
901             # copy the query data, so we don't clobber the
902             # stored data when using chunks
903             #my $query_copy = { %{ $query } };
904            
905             # marry data from the child to the data from the queue
906             #%$query_copy = (%$query_copy, %$data);
907            
908             # my $query_copy = { (%$query, %$data) };
909            
910             # my $query_copy = $query;
911             # foreach (keys %$data) { $query_copy->{$_} = $data->{$_}; }
912              
913 27         205 my $query_copy = { %$query, %$data };
914            
915             # undocumented
916 27 50       299 $poe_kernel->call( $self->{query_logger} => _log => $query_copy )
917             if ( $self->{query_logger} );
918              
919             # my ($ses,$evt) = ("$query_copy->{session}", "$query_copy->{event}");
920            
921             # $kernel->call($ses => $evt => $query_copy);
922            
923             #undef $query;
924             #foreach my $k (keys %$data) {
925             # $query_copy->{$k} = $data->{$k};
926             #}
927            
928 27 100 66     135 if ( reftype( $query_copy->{event} ) && reftype( $query_copy->{event} ) eq 'CODE' ) {
929 7         9 DEBUG && print "calling callback\n";
930 7         17 my $callback = delete $query_copy->{event};
931 7         28 $callback->($query_copy);
932             } else {
933             #DEBUG && print "calling event $query->{event} in session $query->{session} from our session ".$_[SESSION]->ID."\n";
934 20         50 $kernel->call($query_copy->{session} => $query_copy->{event} => $query_copy);
935             }
936              
937             # Decrement the refcount for the session that sent us a query
938 27 50       7714 if ($refcount_decrement) {
939 27         42 $heap->{active} = 0;
940 27         72 $kernel->refcount_decrement($query_copy->{session}, 'EasyDBI');
941            
942 27 100 66     745 if (defined($query_copy->{sendersession}) && $query_copy->{sendersession} ne $query_copy->{session}) {
943 4         12 $kernel->refcount_decrement($query_copy->{sendersession}, 'EasyDBI');
944             }
945              
946             # Now, that we have got a result, check if we need to send another query
947 27         111 $kernel->call($_[SESSION], 'check_queue');
948             }
949              
950 27         256 return;
951             }
952              
953             # Handles child STDERR output
954             sub child_STDERR {
955 0     0 0 0 my $input = $_[ARG0];
956              
957             # Skip empty lines as the POE::Filter::Line manpage says...
958 0 0       0 if ($input eq '') { return }
  0         0  
959              
960 0 0 0     0 return if ($_[HEAP]->{opts}->{no_warnings} && !DEBUG);
961              
962 0         0 warn "$input\n";
963             }
964              
965             sub sig_child {
966 2     2 0 1697 my ($kernel, $heap) = @_[KERNEL, HEAP];
967            
968 2         5 delete $heap->{wheel_pid};
969 2         11 $kernel->sig_handled();
970            
971 2 50       30 if ( $heap->{shutdown} ) {
972             # Remove our alias so we can be properly terminated
973 2 50       16 $kernel->alias_remove($heap->{alias}) if ($heap->{alias} ne '');
974             # and the child
975 2         93 $kernel->sig( 'CHLD' );
976             }
977             }
978              
979             # ----------------
980             # Object methods
981              
982             sub ID {
983 1     1 1 3 my $self = shift;
984 1         40 return $self->{ID};
985             }
986              
987             # not called directly
988             sub DESTROY {
989 1     1   291 my $self = shift;
990 1 50 33     10 if (ref($self) && $self->ID) {
991             # $poe_kernel->post($self->ID => 'shutdown' => @_);
992             } else {
993 0           return undef;
994             }
995             }
996              
997             # End of module
998             1;
999              
1000             # egg: I like Sealab 2021. This is why you can see lots of 2021 refences in my code.
1001              
1002             #=item C
1003             #
1004             #Optional. If set to a true value, it will install a signal handler that will
1005             #call $sth->cancel when a INT signal is received by the sub-process. You may
1006             #want to read the docs on your driver to see if this is supported.
1007              
1008             __END__