File Coverage

blib/lib/POE/Component/DBIAgent/Helper.pm
Criterion Covered Total %
statement 27 94 28.7
branch 1 26 3.8
condition 0 10 0.0
subroutine 8 11 72.7
pod 0 2 0.0
total 36 143 25.1


line stmt bran cond sub pod time code
1             package POE::Component::DBIAgent::Helper;
2              
3 1     1   154296 use DBI;
  1         28239  
  1         89  
4             #use Daemon; # qw//;
5 1     1   1455 use Data::Dumper;
  1         11293  
  1         93  
6 1     1   10 use POE::Filter::Reference;
  1         2  
  1         96  
7              
8             BEGIN {
9 1     1   3 my $can_delay = 0;
10 1         2 eval { require Time::HiRes; };
  1         6  
11 1 50       6 unless ($@) {
12 1         9 Time::HiRes->import(qw/usleep/);
13 1         249 $can_delay = 1;
14             }
15 0     0 0   sub CAN_DELAY { $can_delay }
16              
17             }
18 1     1   7 use strict;
  1         2  
  1         40  
19              
20 1     1   6 use vars qw/$VERSION/;
  1         3  
  1         123  
21             $VERSION = sprintf("%d.%02d", q$Revision: 0.03 $ =~ /(\d+)\.(\d+)/);
22              
23 1     1   7 use constant DEBUG => 0;
  1         2  
  1         74  
24 1     1   5 use constant DEBUG_NOUPDATE => 0;
  1         2  
  1         1198  
25              
26             my $filter = POE::Filter::Reference->new();
27              
28             sub run {
29              
30 0     0 0   DEBUG && warn " QA: start\n";
31 0           DEBUG_NOUPDATE && warn " QA: NO UPDATE\n";
32              
33 0           my ($type, $dsn, $queries) = @_;
34              
35 0           my $self = bless {}, $type;
36 0           $self->_init_dbi($dsn, $queries);
37              
38 0           $| = 1;
39              
40 0           $self->{dbh}->{RaiseError} = 0;
41 0           $self->{dbh}->{PrintError} = 0;
42              
43 0           DEBUG && warn " QA: initialized\n";
44              
45 0           my ($row, $output); # to hold DBI results
46 0           while ( sysread( STDIN, my $buffer = '', 1024 ) ) {
47 0           my $lines = $filter->get( [ $buffer ] );
48              
49             #++ look for the exit sign in the current set of commands
50 0           my $exit = grep /^EXIT$/, map $_->{query}, @$lines;
51             ### DEBUG && warn "Exit? - ", $exit, "\n";
52              
53 0           foreach my $task (@$lines) {
54             ### DEBUG && warn " QA: Got line: ", Dumper($task), "\n";
55              
56             #++ this doesn't match what DBIAgent::Queue sends in exit_all();
57             # last if /^EXIT$/; # allow parent to tell us to exit
58              
59             # Set up query
60 0           my ($query_id);
61 0           $query_id = $task->{query};
62 0 0         my $rowtype = $task->{hash} ? 'fetchrow_hashref' : 'fetchrow_arrayref';
63              
64 0 0 0       if ($query_id eq 'CREATE' or $query_id eq 'EXIT') {
65             #++ make sure the EXIT event isn't actually sent to the db
66 0           next;
67             }
68              
69             ### DEBUG && warn " QA: Read data: $query_id for $task->{state} (params @{$task->{params}})\n";
70              
71 0 0         unless (exists $self->{$query_id}) {
72 0           DEBUG && warn " QA: No such query: $query_id";
73 0           next;
74             }
75 0           DEBUG && warn " QA: query $query_id exists\n";
76              
77 0           my $rowcount = 0;
78              
79 0   0       my $result = { package => $task->{package}, state => $task->{state},
80             data => undef,
81             query => $query_id,
82             id => $task->{id},
83             cookie => $task->{cookie} || undef, # XXX remove?
84             group => $task->{group},
85             };
86              
87 0 0         if (ref $self->{$query_id}) { # Is it a DBI statement handle?
88              
89             # Normal query loop. This is where we usually go.
90 0 0         unless ( $self->{$query_id}->execute( @{$task->{params}} ) ) {
  0            
91 0           DEBUG && warn " QA: error executing query: ", $self->{$query_id}->errstr,"\n";
92              
93             # this goes to stderr. If an ErrorState was
94             # supplied, the user will see this message.
95 0           warn "QA: error executing query: ", $self->{$query_id}->errstr,"\n";
96              
97 0           $result->{data} = 'EOF';
98 0           $output = $filter->put( [ $result ] );
99 0           print @$output;
100              
101             #print "ERROR|", $self->{$query_id}->errstr, "\n";
102             } else {
103 0           DEBUG && warn " QA: query running\n";
104              
105 0 0         if ($self->{$query_id}{Active}) {
106 0           while (defined ($row = $self->{$query_id}->$rowtype())) {
107              
108 0           $rowcount++;
109              
110 0           $result->{data} = $row;
111 0           $output = $filter->put( [ $result ] );
112              
113             # This prevents monopolizing the parent with
114             # db responses.
115 0 0 0       CAN_DELAY and $task->{delay} and usleep(1);
116              
117 0           print @$output;
118             #warn " QA: got row $rowcount: ",,"\n";
119              
120             }
121             }
122              
123 0           $result->{data} = 'EOF';
124 0           $output = $filter->put( [ $result ] );
125 0           print @$output;
126 0           DEBUG && warn " QA: ROWS|$rowcount\n";
127              
128             }
129              
130             } else { # *NOT* a DBI statement handle
131              
132             # $queries->{$query_id} is a STRING query. This is a
133             # debug feature. Print a debug message, and send back
134             # EOF, but don't actually touch the database.
135 0           my $query = $queries->{$query_id};
136              
137 0           my @params = @{$task->{params}};
  0            
138             # Replace ? placeholders with bind values.
139 0           $query =~ s/\?/@params/eg;
  0            
140              
141 0           DEBUG && warn " QA: $query\n";
142              
143 0           $result->{data} = 'EOF';
144 0           $output = $filter->put( [ $result ] );
145 0           print @$output;
146              
147             }
148             }
149             #++ put here to make sure all the queries in the current buffer are dealt with before disconnecting
150 0 0         last if $exit;
151             }
152              
153 0           DEBUG && warn " QA: Disconnect and Exit\n";
154 0           $self->{dbh}->disconnect;
155              
156             }
157              
158             # {{{ _init_dbi
159              
160             sub _init_dbi {
161 0     0     my ($heap, $dsn, $queries) = @_;
162              
163 0 0         my $dbh = DBI->connect(@$dsn, { AutoCommit => 1, RaiseError => 0, PrintError => 0 }) or die DBI->errstr;
164 0           $heap->{dbh} = $dbh;
165              
166             #$dbh->{RowCacheSize} = 500;
167              
168 0 0         if (defined $queries) {
169 0           foreach (keys %$queries) {
170 0 0 0       if ($queries->{$_} =~ /insert|update|delete/i and DEBUG_NOUPDATE) {
171 0           $heap->{$_} = $queries->{$_};
172             } else {
173 0 0         $heap->{$_} = $dbh->prepare($queries->{$_}) or die $dbh->errstr;
174             }
175             }
176              
177 0           return;
178             }
179              
180             }
181              
182             # }}} _init_dbi
183              
184             1;
185              
186             __END__