File Coverage

blib/lib/Cassandra/Client/AsyncEV.pm
Criterion Covered Total %
statement 14 98 14.2
branch 0 28 0.0
condition 0 11 0.0
subroutine 5 22 22.7
pod 0 11 0.0
total 19 170 11.1


line stmt bran cond sub pod time code
1             package Cassandra::Client::AsyncEV;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::AsyncEV::VERSION = '0.13_006'; # TRIAL
4              
5 1     1   22 $Cassandra::Client::AsyncEV::VERSION = '0.13006';use 5.010;
  1         5  
6 1     1   8 use strict;
  1         8  
  1         32  
7 1     1   8 use warnings;
  1         3  
  1         46  
8              
9 1     1   8 use Time::HiRes qw(CLOCK_MONOTONIC);
  1         3  
  1         17  
10 1     1   131 use vars qw/@TIMEOUTS/;
  1         4  
  1         1275  
11              
12             sub new {
13 0     0 0   my ($class, %args)= @_;
14              
15 0           my $options= $args{options};
16              
17 0           require EV;
18              
19             return bless {
20 0   0       timer_granularity => ($options->{timer_granularity} || 0.1),
21             ev_read => {},
22             ev_write => {},
23             ev_timeout => undef,
24             fh_to_obj => {},
25             timeouts => [],
26             ev => EV::Loop->new(),
27             }, $class;
28             }
29              
30             sub register {
31 0     0 0   my ($self, $fh, $connection)= @_;
32 0           $self->{fh_to_obj}{$fh}= $connection;
33 0           return;
34             }
35              
36             sub unregister {
37 0     0 0   my ($self, $fh)= @_;
38 0           delete $self->{fh_to_obj}{$fh};
39 0 0 0       if ($self->{timeouts} && grep { $_->[1] == $fh && !$_->[3] } @{$self->{timeouts}}) {
  0 0          
  0            
40 0           warn 'In unregister(): not all timeouts were dismissed!';
41             }
42 0           @{$self->{timeouts}}= grep { $_->[1] != $fh } @{$self->{timeouts}};
  0            
  0            
  0            
43 0 0         undef $self->{ev_timeout} unless @{$self->{timeouts}};
  0            
44 0           return;
45             }
46              
47             sub register_read {
48 0     0 0   my ($self, $fh)= @_;
49 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
50              
51 0     0     $self->{ev_read}{$fh}= $self->{ev}->io( $fh, &EV::READ, sub { $connection->can_read } );
  0            
52 0           return;
53             }
54              
55             sub register_write {
56 0     0 0   my ($self, $fh)= @_;
57 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
58              
59 0     0     $self->{ev_write}{$fh}= $self->{ev}->io( $fh, &EV::WRITE, sub { $connection->can_write } );
  0            
60 0           return;
61             }
62              
63             sub unregister_read {
64 0     0 0   my ($self, $fh)= @_;
65 0           undef $self->{ev_read}{$fh};
66              
67 0           return;
68             }
69              
70             sub unregister_write {
71 0     0 0   my ($self, $fh)= @_;
72 0           undef $self->{ev_write}{$fh};
73              
74 0           return;
75             }
76              
77             sub deadline {
78 0     0 0   my ($self, $fh, $id, $timeout)= @_;
79 0           local *TIMEOUTS= $self->{timeouts};
80              
81 0 0         if (!$self->{ev_timeout}) {
82             $self->{ev_timeout}= $self->{ev}->timer( $self->{timer_granularity}, $self->{timer_granularity}, sub {
83 0     0     $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC));
84 0           } );
85             }
86              
87 0           my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
88 0           my $deadline= $curtime + $timeout;
89 0           my $additem= [ $deadline, $fh, $id, 0 ];
90              
91 0 0 0       if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
92             # Grumble... that's slow
93 0           push @TIMEOUTS, $additem;
94 0           @TIMEOUTS= sort { $a->[0] <=> $b->[0] } @TIMEOUTS;
  0            
95             } else {
96             # Common case
97 0           push @TIMEOUTS, $additem;
98             }
99              
100 0           return \($additem->[3]);
101             }
102              
103             sub handle_timeouts {
104 0     0 0   my ($self, $curtime)= @_;
105              
106 0           local *TIMEOUTS= $self->{timeouts};
107              
108 0           my %triggered_read;
109 0   0       while (@TIMEOUTS && $curtime >= $TIMEOUTS[0][0]) {
110 0           my $item= shift @TIMEOUTS;
111 0 0         if (!$item->[3]) { # If it timed out
112 0           my ($deadline, $fh, $id, $timedout)= @$item;
113 0           my $obj= $self->{fh_to_obj}{$fh};
114 0 0         $obj->can_read unless $triggered_read{$fh}++;
115 0 0         $obj->can_timeout($id) unless $item->[3]; # We may have received an answer...
116             }
117             }
118              
119 0 0         if (!@TIMEOUTS) {
120 0           $self->{ev_timeout}= undef;
121             }
122              
123 0           return;
124             }
125              
126             sub timer {
127 0     0 0   my ($self, $callback, $wait)= @_;
128 0           my $t; $t= $self->{ev}->timer($wait, 0, sub {
129 0     0     undef $t;
130 0           $callback->();
131 0           });
132             }
133              
134             # $something->($async->wait(my $w)); my ($error, $result)= $w->();
135             sub wait {
136 0     0 0   my ($self)= @_;
137 0           my $output= \$_[1];
138              
139 0           my ($done, $in_run);
140 0           my @output;
141             my $callback= sub {
142 0     0     $done= 1;
143 0           @output= @_;
144 0 0         $self->{ev}->break() if $in_run;
145 0           };
146              
147             $$output= sub {
148 0 0   0     if ($self->{in_wait}) {
149 0           die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
150             }
151 0           local $self->{in_wait}= 1;
152              
153 0           $in_run= 1;
154 0 0         $self->{ev}->run unless $done;
155 0           return @output;
156 0           };
157              
158 0           return $callback;
159             }
160              
161             1;
162              
163             __END__