File Coverage

blib/lib/Object/Remote/MiniLoop.pm
Criterion Covered Total %
statement 114 169 67.4
branch 19 36 52.7
condition 4 6 66.6
subroutine 13 45 28.8
pod 0 10 0.0
total 150 266 56.3


line stmt bran cond sub pod time code
1             package Object::Remote::MiniLoop;
2              
3 13     13   6714 use IO::Select;
  13         17484  
  13         688  
4 13     13   6717 use Time::HiRes qw(time);
  13         14034  
  13         62  
5 13     13   6788 use Object::Remote::Logging qw( :log :dlog router );
  13         31  
  13         63  
6 13     13   75 use Moo;
  13         15  
  13         178  
7              
8             BEGIN {
9 13     13   3963 $SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } };
  0         0  
  0         0  
10              
11 13         44 router()->exclude_forwarding
12             }
13              
14             # this is ro because we only actually set it using local in sub run
15             has is_running => (is => 'ro', clearer => 'stop');
16             #maximum duration that select() will block - undef means indefinite,
17             #0 means no blocking, otherwise maximum time in seconds
18             has block_duration => ( is => 'rw' );
19              
20             has _read_watches => (is => 'ro', default => sub { {} });
21             has _read_select => (is => 'ro', default => sub { IO::Select->new });
22              
23             has _write_watches => (is => 'ro', default => sub { {} });
24             has _write_select => (is => 'ro', default => sub { IO::Select->new });
25              
26             has _timers => (is => 'ro', default => sub { [] });
27              
28             sub pass_watches_to {
29 0     0 0 0 my ($self, $new_loop) = @_;
30 0     0   0 log_debug { "passing watches to new run loop" };
  0         0  
31 0         0 foreach my $fh ($self->_read_select->handles) {
32             $new_loop->watch_io(
33             handle => $fh,
34 0         0 on_read_ready => $self->_read_watches->{$fh}
35             );
36             }
37 0         0 foreach my $fh ($self->_write_select->handles) {
38             $new_loop->watch_io(
39             handle => $fh,
40 0         0 on_write_ready => $self->_write_watches->{$fh}
41             );
42             }
43             }
44              
45             sub watch_io {
46 40     40 0 397 my ($self, %watch) = @_;
47 40         83 my $fh = $watch{handle};
48 40     0   293 Dlog_debug { "Adding IO watch for $_" } $fh;
  0         0  
49              
50 40 100       645 if (my $cb = $watch{on_read_ready}) {
51 20     0   131 log_trace { "IO watcher is registering with select for reading" };
  0         0  
52 20         289 $self->_read_select->add($fh);
53 20         853 $self->_read_watches->{$fh} = $cb;
54             }
55 40 100       153 if (my $cb = $watch{on_write_ready}) {
56 20     0   147 log_trace { "IO watcher is registering with select for writing" };
  0         0  
57 20         385 $self->_write_select->add($fh);
58 20         1335 $self->_write_watches->{$fh} = $cb;
59             }
60 40         512 return;
61             }
62              
63             sub unwatch_io {
64 21     21 0 102 my ($self, %watch) = @_;
65 21         57 my $fh = $watch{handle};
66 21     0   189 Dlog_debug { "Removing IO watch for $_" } $fh;
  0         0  
67 21 100       366 if ($watch{on_read_ready}) {
68 2     0   69 log_trace { "IO watcher is removing read from select()" };
  0         0  
69 2         27 $self->_read_select->remove($fh);
70 2         85 delete $self->_read_watches->{$fh};
71             }
72 21 100       78 if ($watch{on_write_ready}) {
73 19     0   145 log_trace { "IO watcher is removing write from select()" };
  0         0  
74 19         337 $self->_write_select->remove($fh);
75 19         1029 delete $self->_write_watches->{$fh};
76             }
77 21         1697 return;
78             }
79              
80             sub _sort_timers {
81 25     25   56 my ($self, @new) = @_;
82 25         65 my $timers = $self->_timers;
83              
84 25     0   170 log_trace { "Sorting timers" };
  0         0  
85              
86 25         256 @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
  25         66  
  36         100  
  25         141  
87 25         44 return;
88             }
89              
90             sub watch_time {
91 23     23 0 131 my ($self, %watch) = @_;
92 23         45 my $at;
93              
94 23     0   167 Dlog_trace { "watch_time() invoked with $_" } \%watch;
  0         0  
95              
96 23 100       375 if (exists($watch{every})) {
    50          
    0          
97 1         5 $at = time() + $watch{every};
98             } elsif (exists($watch{after})) {
99 22         129 $at = time() + $watch{after};
100             } elsif (exists($watch{at})) {
101 0         0 $at = $watch{at};
102             } else {
103 0         0 die "watch_time requires every, after or at";
104             }
105              
106 23 50       103 die "watch_time requires code" unless my $code = $watch{code};
107 23         97 my $timers = $self->_timers;
108 23         67 my $new = [ $at => $code, $watch{every} ];
109 23         91 $self->_sort_timers($new);
110 23     0   182 log_debug { "Created new timer with id '$new' that expires at '$at'" };
  0         0  
111 23         314 return "$new";
112             }
113              
114             sub unwatch_time {
115 1     1 0 4 my ($self, $id) = @_;
116 1     0   8 log_trace { "Removing timer with id of '$id'" };
  0         0  
117 1         25 @$_ = grep !($_ eq $id), @$_ for $self->_timers;
118 1         3 return;
119             }
120              
121             sub _next_timer_expires_delay {
122 713     713   756 my ($self) = @_;
123 713         1057 my $timers = $self->_timers;
124 713         1128 my $delay_max = $self->block_duration;
125              
126 713 50       1411 return $delay_max unless @$timers;
127 713         2809 my $duration = $timers->[0]->[0] - time;
128              
129 713     0   3152 log_trace { "next timer fires in '$duration' seconds" };
  0         0  
130              
131 713 50 33     9675 if ($duration < 0) {
    50          
132 0         0 $duration = 0;
133             } elsif (defined $delay_max && $duration > $delay_max) {
134 0         0 $duration = $delay_max;
135             }
136              
137 713         1326 return $duration;
138             }
139              
140             sub loop_once {
141 713     713 0 908 my ($self) = @_;
142 713         1303 my $read = $self->_read_watches;
143 713         1213 my $write = $self->_write_watches;
144 713         778 my $read_count = 0;
145 713         676 my $write_count = 0;
146 713         1908 my @c = caller;
147 713         1560 my $wait_time = $self->_next_timer_expires_delay;
148             log_trace {
149 0 0   0   0 sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
150             scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' )
151 713         3576 };
152 713         10779 my ($readable, $writeable) = IO::Select->select(
153             $self->_read_select, $self->_write_select, undef, $wait_time
154             );
155             log_trace {
156 0 0   0   0 my $readable_count = defined $readable ? scalar(@$readable) : 0;
157 0 0       0 my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
158 0         0 "Run loop: select returned readable:$readable_count writeable:$writable_count";
159 713         6780781 };
160             # I would love to trap errors in the select call but IO::Select doesn't
161             # differentiate between an error and a timeout.
162             # -- no, love, mst.
163              
164 713     0   11165 log_trace { "Reading from ready filehandles" };
  0         0  
165 713         7219 foreach my $fh (@$readable) {
166 193 50       1052 next unless $read->{$fh};
167 193         342 $read_count++;
168 193         985 $read->{$fh}();
169             #FIXME this is a rough workaround for race conditions that can cause deadlocks
170             #under load
171 193         407 last;
172             }
173 713     0   3156 log_trace { "Writing to ready filehandles" };
  0         0  
174 713         6808 foreach my $fh (@$writeable) {
175 515 50       1512 next unless $write->{$fh};
176 515         506 $write_count++;
177 515         1394 $write->{$fh}();
178             #FIXME this is a rough workaround for race conditions that can cause deadlocks
179             #under load
180 515         5308 last;
181             }
182              
183             #moving the timers above the read() section exposes a deadlock
184 713     0   3628 log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
  0         0  
185 713         8177 my $timers = $self->_timers;
186 713         1782 my $now = time();
187 713     0   3060 log_trace { "Checking timers" };
  0         0  
188 713   100     10442 while (@$timers and $timers->[0][0] <= $now) {
189 5         13 my $active = $timers->[0];
190 5     0   43 Dlog_trace { "Found timer that needs to be executed: '$active'" };
  0         0  
191              
192 5 100       79 if (defined($active->[2])) {
193             #handle the case of an 'every' timer
194 2         10 $active->[0] = time() + $active->[2];
195 2     0   11 Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
  0         0  
196 2         29 $self->_sort_timers;
197             } else {
198             #it doesn't repeat again so get rid of it
199 3         6 shift(@$timers);
200             }
201              
202             #execute the timer
203 5         23 $active->[1]->();
204             }
205              
206 713     0   3045 log_trace { "Run loop: single loop is completed" };
  0         0  
207 713         10146 return;
208             }
209              
210             sub want_run {
211 0     0 0 0 my ($self) = @_;
212 0     0   0 Dlog_debug { "Run loop: Incremeting want_running, is now $_" }
213 0         0 ++$self->{want_running};
214             }
215              
216             sub run_while_wanted {
217 0     0 0 0 my ($self) = @_;
218 0     0   0 log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
  0         0  
  0         0  
219 0         0 $self->loop_once while $self->{want_running};
220 0     0   0 log_debug { "Run loop: run_while_wanted() completed" };
  0         0  
221 0         0 return;
222             }
223              
224             sub want_stop {
225 0     0 0 0 my ($self) = @_;
226 0 0       0 if (! $self->{want_running}) {
227 0     0   0 log_debug { "Run loop: want_stop() was called but want_running was not true" };
  0         0  
228 0         0 return;
229             }
230 0     0   0 Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
231 0         0 --$self->{want_running};
232             }
233              
234             sub run {
235 175     175 0 285 my ($self) = @_;
236 175     0   1018 log_trace { "Run loop: run() invoked" };
  0         0  
237 175         2061 local $self->{is_running} = 1;
238 175         665 while ($self->is_running) {
239 713         1656 $self->loop_once;
240             }
241 175     0   898 log_trace { "Run loop: run() completed" };
  0         0  
242 175         1963 return;
243             }
244              
245             1;