File Coverage

blib/lib/Object/Remote/MiniLoop.pm
Criterion Covered Total %
statement 114 169 67.4
branch 19 36 52.7
condition 4 6 66.6
subroutine 13 45 28.8
pod 0 10 0.0
total 150 266 56.3


line stmt bran cond sub pod time code
1             package Object::Remote::MiniLoop;
2              
3 13     13   5025 use IO::Select;
  13         17505  
  13         530  
4 13     13   4735 use Time::HiRes qw(time);
  13         12879  
  13         51  
5 13     13   6044 use Object::Remote::Logging qw( :log :dlog router );
  13         33  
  13         61  
6 13     13   83 use Moo;
  13         39  
  13         63  
7              
8             BEGIN {
9 13     13   6081 $SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } };
  0         0  
  0         0  
10              
11 13         63 router()->exclude_forwarding
12             }
13              
14             # this is ro because we only actually set it using local in sub run
15             has is_running => (is => 'ro', clearer => 'stop');
16             #maximum duration that select() will block - undef means indefinite,
17             #0 means no blocking, otherwise maximum time in seconds
18             has block_duration => ( is => 'rw' );
19              
20             has _read_watches => (is => 'ro', default => sub { {} });
21             has _read_select => (is => 'ro', default => sub { IO::Select->new });
22              
23             has _write_watches => (is => 'ro', default => sub { {} });
24             has _write_select => (is => 'ro', default => sub { IO::Select->new });
25              
26             has _timers => (is => 'ro', default => sub { [] });
27              
28             sub pass_watches_to {
29 0     0 0 0 my ($self, $new_loop) = @_;
30 0     0   0 log_debug { "passing watches to new run loop" };
  0         0  
31 0         0 foreach my $fh ($self->_read_select->handles) {
32             $new_loop->watch_io(
33             handle => $fh,
34 0         0 on_read_ready => $self->_read_watches->{$fh}
35             );
36             }
37 0         0 foreach my $fh ($self->_write_select->handles) {
38             $new_loop->watch_io(
39             handle => $fh,
40 0         0 on_write_ready => $self->_write_watches->{$fh}
41             );
42             }
43             }
44              
45             sub watch_io {
46 40     40 0 624 my ($self, %watch) = @_;
47 40         154 my $fh = $watch{handle};
48 40     0   523 Dlog_debug { "Adding IO watch for $_" } $fh;
  0         0  
49              
50 40 100       635 if (my $cb = $watch{on_read_ready}) {
51 20     0   249 log_trace { "IO watcher is registering with select for reading" };
  0         0  
52 20         286 $self->_read_select->add($fh);
53 20         894 $self->_read_watches->{$fh} = $cb;
54             }
55 40 100       258 if (my $cb = $watch{on_write_ready}) {
56 20     0   246 log_trace { "IO watcher is registering with select for writing" };
  0         0  
57 20         464 $self->_write_select->add($fh);
58 20         1819 $self->_write_watches->{$fh} = $cb;
59             }
60 40         569 return;
61             }
62              
63             sub unwatch_io {
64 21     21 0 125 my ($self, %watch) = @_;
65 21         57 my $fh = $watch{handle};
66 21     0   257 Dlog_debug { "Removing IO watch for $_" } $fh;
  0         0  
67 21 100       328 if ($watch{on_read_ready}) {
68 2     0   19 log_trace { "IO watcher is removing read from select()" };
  0         0  
69 2         29 $self->_read_select->remove($fh);
70 2         113 delete $self->_read_watches->{$fh};
71             }
72 21 100       89 if ($watch{on_write_ready}) {
73 19     0   140 log_trace { "IO watcher is removing write from select()" };
  0         0  
74 19         294 $self->_write_select->remove($fh);
75 19         1002 delete $self->_write_watches->{$fh};
76             }
77 21         1053 return;
78             }
79              
80             sub _sort_timers {
81 25     25   103 my ($self, @new) = @_;
82 25         84 my $timers = $self->_timers;
83              
84 25     0   204 log_trace { "Sorting timers" };
  0         0  
85              
86 25         228 @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
  25         119  
  36         164  
  25         209  
87 25         84 return;
88             }
89              
90             sub watch_time {
91 23     23 0 242 my ($self, %watch) = @_;
92 23         74 my $at;
93              
94 23     0   329 Dlog_trace { "watch_time() invoked with $_" } \%watch;
  0         0  
95              
96 23 100       337 if (exists($watch{every})) {
    50          
    0          
97 1         6 $at = time() + $watch{every};
98             } elsif (exists($watch{after})) {
99 22         134 $at = time() + $watch{after};
100             } elsif (exists($watch{at})) {
101 0         0 $at = $watch{at};
102             } else {
103 0         0 die "watch_time requires every, after or at";
104             }
105              
106 23 50       143 die "watch_time requires code" unless my $code = $watch{code};
107 23         164 my $timers = $self->_timers;
108 23         74 my $new = [ $at => $code, $watch{every} ];
109 23         129 $self->_sort_timers($new);
110 23     0   304 log_debug { "Created new timer with id '$new' that expires at '$at'" };
  0         0  
111 23         316 return "$new";
112             }
113              
114             sub unwatch_time {
115 1     1 0 3 my ($self, $id) = @_;
116 1     0   15 log_trace { "Removing timer with id of '$id'" };
  0         0  
117 1         25 @$_ = grep !($_ eq $id), @$_ for $self->_timers;
118 1         4 return;
119             }
120              
121             sub _next_timer_expires_delay {
122 732     732   1084 my ($self) = @_;
123 732         1197 my $timers = $self->_timers;
124 732         1216 my $delay_max = $self->block_duration;
125              
126 732 50       1440 return $delay_max unless @$timers;
127 732         1919 my $duration = $timers->[0]->[0] - time;
128              
129 732     0   3003 log_trace { "next timer fires in '$duration' seconds" };
  0         0  
130              
131 732 50 33     7760 if ($duration < 0) {
    50          
132 0         0 $duration = 0;
133             } elsif (defined $delay_max && $duration > $delay_max) {
134 0         0 $duration = $delay_max;
135             }
136              
137 732         1405 return $duration;
138             }
139              
140             sub loop_once {
141 732     732 0 1233 my ($self) = @_;
142 732         1345 my $read = $self->_read_watches;
143 732         1264 my $write = $self->_write_watches;
144 732         949 my $read_count = 0;
145 732         874 my $write_count = 0;
146 732         1768 my @c = caller;
147 732         1531 my $wait_time = $self->_next_timer_expires_delay;
148             log_trace {
149 0 0   0   0 sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
150             scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' )
151 732         3376 };
152 732         8828 my ($readable, $writeable) = IO::Select->select(
153             $self->_read_select, $self->_write_select, undef, $wait_time
154             );
155             log_trace {
156 0 0   0   0 my $readable_count = defined $readable ? scalar(@$readable) : 0;
157 0 0       0 my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
158 0         0 "Run loop: select returned readable:$readable_count writeable:$writable_count";
159 732         6799455 };
160             # I would love to trap errors in the select call but IO::Select doesn't
161             # differentiate between an error and a timeout.
162             # -- no, love, mst.
163              
164 732     0   9102 log_trace { "Reading from ready filehandles" };
  0         0  
165 732         6404 foreach my $fh (@$readable) {
166 193 50       801 next unless $read->{$fh};
167 193         340 $read_count++;
168 193         792 $read->{$fh}();
169             #FIXME this is a rough workaround for race conditions that can cause deadlocks
170             #under load
171 193         565 last;
172             }
173 732     0   3025 log_trace { "Writing to ready filehandles" };
  0         0  
174 732         6209 foreach my $fh (@$writeable) {
175 534 50       1460 next unless $write->{$fh};
176 534         715 $write_count++;
177 534         1601 $write->{$fh}();
178             #FIXME this is a rough workaround for race conditions that can cause deadlocks
179             #under load
180 534         4678 last;
181             }
182              
183             #moving the timers above the read() section exposes a deadlock
184 732     0   3409 log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
  0         0  
185 732         6665 my $timers = $self->_timers;
186 732         1827 my $now = time();
187 732     0   2711 log_trace { "Checking timers" };
  0         0  
188 732   100     8319 while (@$timers and $timers->[0][0] <= $now) {
189 5         15 my $active = $timers->[0];
190 5     0   38 Dlog_trace { "Found timer that needs to be executed: '$active'" };
  0         0  
191              
192 5 100       72 if (defined($active->[2])) {
193             #handle the case of an 'every' timer
194 2         9 $active->[0] = time() + $active->[2];
195 2     0   11 Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
  0         0  
196 2         24 $self->_sort_timers;
197             } else {
198             #it doesn't repeat again so get rid of it
199 3         9 shift(@$timers);
200             }
201              
202             #execute the timer
203 5         42 $active->[1]->();
204             }
205              
206 732     0   2973 log_trace { "Run loop: single loop is completed" };
  0         0  
207 732         7668 return;
208             }
209              
210             sub want_run {
211 0     0 0 0 my ($self) = @_;
212 0     0   0 Dlog_debug { "Run loop: Incremeting want_running, is now $_" }
213 0         0 ++$self->{want_running};
214             }
215              
216             sub run_while_wanted {
217 0     0 0 0 my ($self) = @_;
218 0     0   0 log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
  0         0  
  0         0  
219 0         0 $self->loop_once while $self->{want_running};
220 0     0   0 log_debug { "Run loop: run_while_wanted() completed" };
  0         0  
221 0         0 return;
222             }
223              
224             sub want_stop {
225 0     0 0 0 my ($self) = @_;
226 0 0       0 if (! $self->{want_running}) {
227 0     0   0 log_debug { "Run loop: want_stop() was called but want_running was not true" };
  0         0  
228 0         0 return;
229             }
230 0     0   0 Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
231 0         0 --$self->{want_running};
232             }
233              
234             sub run {
235 175     175 0 386 my ($self) = @_;
236 175     0   877 log_trace { "Run loop: run() invoked" };
  0         0  
237 175         1662 local $self->{is_running} = 1;
238 175         600 while ($self->is_running) {
239 732         1608 $self->loop_once;
240             }
241 175     0   826 log_trace { "Run loop: run() completed" };
  0         0  
242 175         1649 return;
243             }
244              
245             1;