File Coverage

blib/lib/Data/Model/Driver/Queue/Q4M.pm
Criterion Covered Total %
statement 12 90 13.3
branch 0 28 0.0
condition 0 13 0.0
subroutine 4 11 36.3
pod 0 5 0.0
total 16 147 10.8


line stmt bran cond sub pod time code
1             package Data::Model::Driver::Queue::Q4M;
2 1     1   158744 use strict;
  1         2  
  1         33  
3 1     1   5 use warnings;
  1         3  
  1         26  
4 1     1   5 use base 'Data::Model::Driver::DBI';
  1         2  
  1         767  
5              
6 1     1   8 use Carp ();
  1         4  
  1         963  
7             $Carp::Internal{(__PACKAGE__)}++;
8              
9 0     0 0   sub timeout { $_[0]->{timeout} }
10              
11             sub _create_arguments {
12 0     0     my $arg_length = scalar(@_);
13 0           my $timeout;
14             my %callbacks;
15 0           my @queue_tables;
16 0           for (my $i = 0; $i < $arg_length; $i++) {
17 0           my($table, $value) = ($_[$i], $_[$i + 1]);
18 0 0 0       if (ref($value) eq 'CODE') {
    0          
19             # register callback
20 0           push @queue_tables, $table;
21 0           $callbacks{$table} = $value;
22              
23             } elsif ($table eq 'timeout' && $value =~ /\A[0-9]+\z/) {
24             # timeout
25 0           $timeout = $value;
26             }
27 0           $i++;
28             }
29 0           (\@queue_tables, \%callbacks, $timeout);
30             }
31              
32             sub queue_wait {
33 0     0 0   my($self, $timeout, @tables) = @_;
34              
35 0           my $dbh = $self->r_handle;
36 0           my $sql = sprintf 'SELECT queue_wait(%s)', join(', ', (('?') x (scalar(@tables) + 1)));
37 0           my $sth = $dbh->prepare_cached($sql);
38              
39             # bind params
40 0           my $i = 1;
41 0           for my $table (@tables) {
42 0           $sth->bind_param($i++, $table, undef);
43             }
44 0           $sth->bind_param($i, $timeout, undef);
45              
46 0           $sth->execute;
47 0           $sth->bind_columns(undef, \my $retcode);
48              
49 0           my $rv = $sth->fetch;
50 0           $sth->finish;
51 0           undef $sth;
52              
53 0 0         unless (defined $retcode) { # queue_wait return code is NULL is illegal table name
54 0           Carp::croak "no created queue table";
55             }
56 0 0 0       return 0 unless $rv && $retcode;
57 0           return $retcode;
58             }
59              
60             sub queue_abort {
61 0     0 0   my $self = shift;
62              
63 0           my $dbh = $self->r_handle;
64 0           my $sql = 'SELECT queue_abort()';
65 0           my $sth = $dbh->prepare($sql);
66 0           $sth->execute;
67 0           $self->{is_aborted} = 1;
68             }
69              
70             sub queue_end {
71 0     0 0   my $self = shift;
72              
73 0           my $dbh = $self->r_handle;
74 0           my $sql = 'SELECT queue_end()';
75 0           my $sth = $dbh->prepare($sql);
76 0           $sth->execute;
77             }
78              
79             sub queue_running {
80 0     0 0   my($self, $c) = (shift, shift);
81 0           $self->{is_aborted} = 0;
82 0           my $arg_length = scalar(@_);
83 0 0         Carp::croak 'illegal parameter' if $arg_length % 2;
84              
85             # create table attributes
86 0           my($queue_tables, $callbacks, $timeout) = _create_arguments(@_);
87 0 0         Carp::croak 'required is callback handler' unless @{ $queue_tables };
  0            
88              
89 0           my %schema = map { $_ => 1 } $c->schema_names;
  0            
90 0           for my $table (@{ $queue_tables }) {
  0            
91 0           my($name) = split /:/, $table;
92 0 0         Carp::croak "'$name' is missing model name" unless $schema{$name};
93             }
94              
95 0   0       $timeout ||= $self->timeout || 60;
      0        
96              
97             # queue_wait
98 0           my $table_id = $self->queue_wait($timeout, @{ $queue_tables });
  0            
99 0 0         return unless $table_id;
100              
101             # get record
102 0           my $running_table = $queue_tables->[$table_id - 1];
103 0           my($real_table) = split /:/, $running_table;
104 0           my($row) = $c->get( $real_table );
105 0 0         unless ($row) {
106 0           $self->queue_abort;
107 0           return;
108             }
109              
110             # running callback
111 0           eval {
112 0           $callbacks->{$running_table}->($row);
113             };
114 0 0         if ($@) {
115 0 0         $self->queue_abort unless $self->{is_aborted};
116 0           die $@; # throwing exception
117             }
118 0 0         return if $self->{is_aborted};
119              
120 0           $self->queue_end;
121 0           return $real_table;
122             }
123              
124             # for schema
125             sub _as_sql_hook {
126 0     0     my $self = shift;
127              
128 0 0         if ($_[1] eq 'get_table_attributes') {
129 0           my $ret = $self->dbd->_as_sql_hook(@_);
130 0 0         unless ($ret =~ s/(\A|\W)\s*ENGINE\s*=\s*\w+\s*(\z|\W)/${1}TYPE=QUEUE${2}/) {
131 0   0       $ret ||= 'ENGINE=QUEUE';
132             }
133 0           return $ret;
134             } else {
135 0           return $self->dbd->_as_sql_hook(@_);
136             }
137             }
138              
139             1;
140              
141              
142             __END__