File Coverage

blib/lib/Couchbase/Test/Async/Loop.pm
Criterion Covered Total %
statement 66 162 40.7
branch 0 42 0.0
condition 0 9 0.0
subroutine 22 38 57.8
pod 0 8 0.0
total 88 259 33.9


line stmt bran cond sub pod time code
1             package Couchbase::Test::Async::Loop;
2 2     2   6 use strict;
  2         4  
  2         58  
3 2     2   8 use warnings;
  2         2  
  2         92  
4 2     2   10 use Couchbase::Client::Async;
  2         2  
  2         30  
5 2     2   6 use Couchbase::Client::IDXConst;
  2         4  
  2         506  
6 2     2   10 use Couchbase::Client::Errors;
  2         4  
  2         184  
7              
8 2     2   8 use POE;
  2         4  
  2         10  
9 2     2   542 use POE::Kernel;
  2         4  
  2         8  
10 2     2   62 use POE::Session;
  2         4  
  2         6  
11 2     2   90 use Data::Dumper;
  2         4  
  2         78  
12 2     2   6 use Log::Fu { level => "info" };
  2         4  
  2         12  
13 2     2   2430 use Devel::Peek;
  2         760  
  2         10  
14 2     2   148 use Array::Assign;
  2         2  
  2         104  
15              
16 2     2   10 use base qw(POE::Sugar::Attributes);
  2         4  
  2         198  
17              
18             my $poe_kernel = "POE::Kernel";
19              
20             sub cbc_connect :Start {
21 0     0 0 0 $_[HEAP]->object->connect();
22 2     2   8 }
  2         4  
  2         10  
23              
24             sub unhandled :Event(_default) {
25 0     0 0 0 log_errf("Got unknown event %s", $_[ARG0]);
26 2     2   426 }
  2         2  
  2         6  
27              
28             sub got_error :Event {
29 0     0 0 0 log_errf("Got errnum=%d, errstr=%s",
30             $_[ARG0], $_[ARG1]);
31 0         0 $_[HEAP]->on_error(@_[ARG0,ARG1]);
32 2     2   382 }
  2         4  
  2         8  
33              
34              
35             #This would be an event-loop specific implementation of update_event
36             my %EVMETH_MAP = (
37             COUCHBASE_WRITE_EVENT, "write",
38             COUCHBASE_READ_EVENT, "read"
39             );
40              
41             sub _activate_events {
42 0     0     my ($cbc_flags, $dupfh, $opaque) = @_;
43 0           while (my ($ev,$meth) = each %EVMETH_MAP ) {
44 0 0         if($cbc_flags & $ev) {
45 0           log_debugf("Activating event %d on dupfd %d", $ev, fileno($dupfh));
46 0           $poe_kernel->${\"select_$meth"}($dupfh, "dispatch_event", $ev, $opaque);
  0            
47             }
48             }
49             }
50              
51             sub _deactivate_events {
52 0     0     my ($cbc_flags, $dupfh) = @_;
53 0           while (my ($ev,$meth) = each %EVMETH_MAP ) {
54 0 0         if($cbc_flags & $ev) {
55 0           log_debugf("Deactivating event %d on dupfd %d", $ev, fileno($dupfh));
56 0           $poe_kernel->${\"select_$meth"}($dupfh);
  0            
57             }
58             }
59             }
60              
61             sub _startstop_events {
62 0     0     my ($events,$prefix,$dupfh) = @_;
63 0           while (my ($ev,$meth) = each %EVMETH_MAP) {
64 0 0         if($events & $ev) {
65 0           log_debugf("Invoking $prefix: $meth on dupfd %d", fileno($dupfh));
66 0           $poe_kernel->${\"$prefix\_$meth"}($dupfh);
  0            
67             }
68             }
69             }
70              
71              
72             sub update_event :Event {
73 0     0 0 0 my ($evdata,$action,$flags) = @_[ARG0..ARG2];
74 0         0 my $dupfh = $evdata->[EVIDX_DUPFH];
75              
76 0 0 0     0 if($action == EVACTION_WATCH) {
    0          
    0          
77 0 0       0 if(!$dupfh) {
78 0         0 open $dupfh, ">&", $evdata->[EVIDX_FD];
79 0         0 _activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
80 0         0 $evdata->[EVIDX_DUPFH] = $dupfh;
81             } else {
82 0         0 my $events_do_delete = $evdata->[EVIDX_WATCHFLAGS] & (~$flags);
83 0         0 log_debugf("Old events=%x, new events = %x, delete events %x",
84             $evdata->[EVIDX_WATCHFLAGS], $flags, $events_do_delete);
85 0         0 _activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
86 0         0 _deactivate_events($events_do_delete, $dupfh);
87             }
88             } elsif ($action == EVACTION_UNWATCH) {
89 0 0       0 if(!$dupfh) {
90 0         0 warn("Unwatch requested on undefined dup'd filehandle");
91 0         0 return;
92             }
93 0         0 _deactivate_events($evdata->[EVIDX_WATCHFLAGS], $dupfh);
94             } elsif ($action == EVACTION_SUSPEND || $action == EVACTION_RESUME) {
95 0 0       0 if(!$dupfh) {
96 0         0 warn("suspend/resume requested on undefined dup'd filehandle. ".
97             "fd=".$evdata->[EVIDX_FD]);
98             }
99 0 0       0 my $prefix = $action == EVACTION_SUSPEND ? "pause" : "resume";
100 0         0 $prefix = "select_" . $prefix;
101 0         0 _startstop_events($evdata->[EVIDX_WATCHFLAGS], $prefix, $dupfh);
102             } else {
103 0         0 die("Unhandled action $action");
104             }
105 2     2   1220 }
  2         4  
  2         6  
106              
107             sub update_timer :Event {
108 0     0 0 0 my ($evdata,$action,$usecs) = @_[ARG0..ARG2];
109 0         0 my $timer_id = $evdata->[EVIDX_PLDATA];
110 0         0 my $seconds;
111              
112 0 0       0 if($usecs) {
113 0         0 $seconds = ($usecs / (1000*1000));
114             }
115 0 0       0 if($action == EVACTION_WATCH) {
116 0 0       0 if(defined $timer_id) {
117 0         0 log_debugf("Rescheduling timer %d in %0.5f seconds from now",
118             $timer_id, $seconds);
119 0         0 $poe_kernel->delay_adjust($timer_id, $seconds)
120             } else {
121 0         0 $timer_id = $poe_kernel->delay_set(
122             "dispatch_timeout", $seconds, $evdata->[EVIDX_OPAQUE]);
123 0         0 $evdata->[EVIDX_PLDATA] = $timer_id;
124 0         0 log_debugf("Scheduling timer %d for %0.5f seconds from now",
125             $timer_id, $seconds);
126             }
127             } else {
128 0 0       0 if(defined $timer_id) {
129 0         0 log_debug("Deletion requested for timer $timer_id.");
130 0         0 $poe_kernel->alarm_remove($timer_id);
131 0         0 $evdata->[EVIDX_PLDATA] = undef;
132             } else {
133 0         0 log_debug("Requested to delete non-existent timer ID");
134             }
135             }
136 2     2   646 }
  2         4  
  2         6  
137              
138             #this is what an event loop does in order to tell libcouchbase that an event
139             #has been received
140             sub dispatch_event :Event {
141 0     0 0 0 my ($flags,$opaque) = @_[ARG2..ARG3];
142 0         0 log_debugf("Flags=%d, opaque=%x", $flags, $opaque);
143 0         0 Couchbase::Client::Async->HaveEvent($flags, $opaque);
144 2     2   426 }
  2         4  
  2         6  
145              
146             sub dispatch_timeout :Event {
147 0     0 0 0 my $opaque = $_[ARG0];
148 0         0 my $flags = 0;
149 0         0 log_debugf("Dispatching timer.. opaque=%x", $opaque);
150 0         0 Couchbase::Client::Async->HaveEvent($flags, $opaque);
151 2     2   382 }
  2         4  
  2         6  
152              
153              
154             #### External interface
155              
156             use Class::XSAccessor {
157 2         20 constructor => 'new',
158             accessors => [qw(object alias on_ready on_error)]
159 2     2   352 };
  2         4  
160              
161             sub spawn {
162 0     0 0   my ($cls,$session_name,%options) = @_;
163 0 0         my $cb_ready = delete $options{on_ready}
164             or die ("Must have on_ready callback");
165 0           my $user_error_callback = delete $options{on_error};
166              
167             my $async = Couchbase::Client::Async->new({
168             %options,
169             cb_error =>
170 0     0     sub { $poe_kernel->post($session_name, "got_error", @_) },
171             cb_update_event =>
172 0     0     sub { $poe_kernel->call($session_name, "update_event", @_) },
173              
174             cb_waitdone => $cb_ready,
175              
176             cb_update_timer =>
177 0     0     sub { $poe_kernel->call($session_name, "update_timer", @_) }
178 0           });
179              
180 0           my $o = __PACKAGE__->new(alias => $session_name, object => $async,
181             on_error => $user_error_callback);
182 0           POE::Session->create(
183             heap => $o,
184             inline_states =>
185             POE::Sugar::Attributes->inline_states(__PACKAGE__, $session_name)
186             );
187 0           $async->connect();
188 0           return $o;
189             }
190              
191             sub _single_dispatch_common {
192 0     0     my ($result,$arg) = @_;
193 0           my ($key) = keys %$result;
194 0           my ($ret) = values %$result;
195              
196 0 0         if($arg->{callback}) {
197 0           $arg->{callback}->($key, $ret, $arg->{arg});
198             } else {
199 0           $poe_kernel->post($arg->{session}, $arg->{state},
200             $key, $ret,$arg->{arg});
201             }
202             }
203              
204             my %STR2CMD = (
205             set => PLCBA_CMD_SET,
206             cas => PLCBA_CMD_CAS,
207             add => PLCBA_CMD_ADD,
208             replace => PLCBA_CMD_REPLACE,
209             append => PLCBA_CMD_APPEND,
210             prepend => PLCBA_CMD_PREPEND,
211             get => PLCBA_CMD_GET,
212             lock => PLCBA_CMD_LOCK,
213             touch => PLCBA_CMD_TOUCH,
214             remove => PLCBA_CMD_REMOVE,
215             arithmetic => PLCBA_CMD_ARITHMETIC,
216             incr => PLCBA_CMD_INCR,
217             decr => PLCBA_CMD_DECR
218             );
219              
220             sub _catchall :Event(set, get, cas, add, replace, remove, arithmetic, incr, decr, replace, append, prepend)
221             {
222 0     0     my ($op_params, $cb_params) = @_[ARG0, ARG1];
223 0 0         if (!exists $STR2CMD{$_[STATE]}) {
224 0           die("Unknown command: ".$_[STATE]);
225             }
226              
227 0 0 0       if( $cb_params->{state} && (!$cb_params->{session}) ) {
228 0           $cb_params->{session} = $_[SENDER];
229             }
230              
231 0 0 0       unless($cb_params->{state} || $cb_params->{callback}) {
232 0           die("Must have either target state or CODE reference for notification");
233             }
234              
235 0 0         if($cb_params->{callback}) {
236 0 0         unless(ref $cb_params->{callback} eq 'CODE') {
237 0           die("Callback must be a CODE reference");
238             }
239             }
240              
241 0           my $cmdi = $STR2CMD{$_[STATE]};
242 0           $_[HEAP]->object->command(
243             $cmdi,
244             $op_params,
245             {
246             callback => \&_single_dispatch_common,
247             data => $cb_params,
248             type => CBTYPE_COMPLETION
249             }
250             );
251 2     2   1386 }
  2         6  
  2         6  
252              
253             1;