File Coverage

blib/lib/POE/Resource/Events.pm
Criterion Covered Total %
statement 136 150 90.6
branch 57 72 79.1
condition 22 31 70.9
subroutine 21 22 95.4
pod n/a
total 236 275 85.8


line stmt bran cond sub pod time code
1             # Data and accessors to manage POE's events.
2              
3             package POE::Resource::Events;
4              
5 175     175   762 use vars qw($VERSION);
  175         236  
  175         9245  
6             $VERSION = '1.365'; # NOTE - Should be #.### (three decimal places)
7              
8             # These methods are folded into POE::Kernel;
9             package POE::Kernel;
10              
11 175     175   963 use strict;
  175         249  
  175         313460  
12              
13             # A local copy of the queue so we can manipulate it directly.
14             my $kr_queue;
15              
16             my %event_count;
17             # ( $session_id => $count,
18             # ...,
19             # );
20              
21             my %post_count;
22             # ( $session_id => $count,
23             # ...,
24             # );
25              
26             ### Begin-run initialization.
27              
28             sub _data_ev_initialize {
29 175     175   323 my ($self, $queue) = @_;
30 175         397 $kr_queue = $queue;
31             }
32              
33             ### End-run leak checking.
34              
35             sub _data_ev_relocate_kernel_id {
36 4     4   17 my ($self, $old_id, $new_id) = @_;
37              
38 4 100       51 $event_count{$new_id} = delete $event_count{$old_id}
39             if exists $event_count{$old_id};
40 4 50       44 $post_count{$new_id} = delete $post_count{$old_id}
41             if exists $post_count{$old_id};
42             }
43              
44             sub _data_ev_finalize {
45 190     190   1261 my $finalized_ok = 1;
46 190         825 while (my ($ses_id, $cnt) = each(%event_count)) {
47 0         0 $finalized_ok = 0;
48 0         0 _warn("!!! Leaked event-to count: $ses_id = $cnt\n");
49             }
50              
51 190         1088 while (my ($ses_id, $cnt) = each(%post_count)) {
52 0         0 $finalized_ok = 0;
53 0         0 _warn("!!! Leaked event-from count: $ses_id = $cnt\n");
54             }
55 190         358 return $finalized_ok;
56             }
57              
58             ### Enqueue an event.
59              
60             sub FIFO_TIME_EPSILON () { 0.000001 }
61             my $last_fifo_time = monotime();
62              
63             sub _data_ev_enqueue {
64             my (
65 5399     5399   15363 $self,
66             $session, $source_session, $event, $type, $etc,
67             $file, $line, $fromstate, $time, $delta, $priority
68             ) = @_;
69              
70 5399         14063 my $sid = $session->ID;
71              
72 5398         6207 if (ASSERT_DATA) {
73             unless ($self->_data_ses_exists($sid)) {
74             _trap(
75             " can't enqueue event ``$event'' for nonexistent",
76             $self->_data_alias_loggable($sid)
77             );
78             }
79             }
80              
81             # This is awkward, but faster than using the fields individually.
82 5398         15858 my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ];
83 1266 100       2574 if( defined $time ) {
84 4140         12757 $event_to_enqueue->[EV_WALLTIME] = $time;
85 4140         7433 $event_to_enqueue->[EV_DELTA] = $delta;
86 1515   100     2900 $priority ||= wall2mono( $time + ($delta||0) );
      100        
87             }
88             else {
89 2765   66     6565 $priority ||= monotime();
90             }
91              
92 2773         11207 my $new_id;
93 3891         17335 my $old_head_priority = $kr_queue->get_next_priority();
94              
95 5398 100       7572 unless ($type & ET_MASK_DELAYED) {
96 4565 100       13911 $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time;
97 4565         9391 $last_fifo_time = $priority;
98             }
99              
100 3438         9647 $new_id = $kr_queue->enqueue($priority, $event_to_enqueue);
101 3438         5645 $event_to_enqueue->[EV_SEQ] = $new_id;
102              
103             #_carp( Carp::longmess( " priority is much to far in the future" ) ) if $priority > 1354569908;
104 5398         11971 if (TRACE_EVENTS ) {
105             _warn(
106             " enqueued event $new_id ``$event'' from ",
107             $self->_data_alias_loggable($source_session->ID), " to ",
108             $self->_data_alias_loggable($sid),
109             " at $time, priority=$priority"
110             );
111             }
112              
113 5398 100       12347 unless (defined $old_head_priority) {
    100          
114 4333         4595 $self->loop_resume_time_watcher($priority);
115             }
116             elsif ($priority < $old_head_priority) {
117 4714         16032 $self->loop_reset_time_watcher($priority);
118             }
119              
120             # This is the counterpart to _data_ev_refcount_dec(). It's only
121             # used in one place, so it's not in its own function.
122              
123 5036 100       26123 $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++;
124              
125 1922 100       6002 return $new_id if $sid eq $source_session->ID();
126              
127 1054 100       7917 $self->_data_ses_refcount_inc($source_session->ID) unless (
128             $post_count{$source_session->ID}++
129             );
130              
131 4231         15959 return $new_id;
132             }
133              
134             sub _data_ev_set
135             {
136 3321     0   319013 my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_;
137              
138 178         27872 my $event = (
139 200         561 grep { $_->[1] == $alarm_id }
140             $kr_queue->peek_items( $my_alarm )
141             )[0];
142              
143 0 100       0 return unless $event;
144              
145 0         0 my $payload = $event->[ITEM_PAYLOAD];
146              
147             # XXX - However, if there has been a clock skew, the priority will
148             # have changed and we should recalculate priority from time+delta
149              
150 0 0 0     0 $delta = $payload->[EV_DELTA] || 0 unless defined $delta;
151 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta );
152 0         0 $payload->[EV_WALLTIME] = $time;
153 0         0 $payload->[EV_DELTA] = $delta;
154              
155 0   0     0 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      0        
156             }
157              
158             sub _data_ev_adjust
159             {
160 30     30   52 my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_;
161              
162             # XXX - However, if there has been a clock skew, the priority will
163             # have changed and we should recalculate priority from time+delta
164 30 50       47 if( $time ) {
165             # PG - We are never invoked with $time anyway.
166 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta );
167             }
168             else {
169 30         85 $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta );
170             }
171              
172 3470         4266 my $event = (
173 30         75 grep { $_->[1] == $alarm_id }
174             $kr_queue->peek_items( $my_alarm )
175             )[0];
176              
177 30 50       152 return unless $event;
178              
179 30         38 my $payload = $event->[ITEM_PAYLOAD];
180              
181 30 50       62 $payload->[EV_WALLTIME] = $time if $time;
182 30 50       67 $payload->[EV_DELTA] += $delta if $delta;
183              
184 30   50     231 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      100        
185             }
186              
187             ### Remove events sent to or from a specific session.
188              
189             sub _data_ev_clear_session {
190 777     777   2272 my ($self, $sid) = @_;
191              
192             # Events sent to the session.
193 777         1317 PENDING: {
194 777         866 my $pending_count = $event_count{$sid};
195 777 100       2177 last PENDING unless $pending_count;
196              
197 221         2078 foreach (
198             $kr_queue->remove_items(
199 243     243   1013 sub { $_[0][EV_SESSION]->ID() eq $sid },
200             $pending_count
201             )
202             ) {
203 226         1076 $self->_data_ev_refcount_dec(
204 226         368 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
205             );
206 226         621 $pending_count--;
207             }
208              
209             # TODO - fork() can make this go negative on some systems.
210 221 50       1593 last PENDING unless $pending_count;
211              
212 0 0       0 croak "lingering pending count: $pending_count" if $pending_count;
213             }
214              
215             # Events sent by the session.
216             SENT: {
217 777         876 my $sent_count = $post_count{$sid};
  777         1182  
218 777 100       1790 last SENT unless $sent_count;
219              
220 3         23 foreach (
221             $kr_queue->remove_items(
222 3     3   13 sub { $_[0][EV_SOURCE]->ID() eq $sid },
223             $sent_count
224             )
225             ) {
226 3         42 $self->_data_ev_refcount_dec(
227 3         6 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
228             );
229 3         9 $sent_count--;
230             }
231              
232 3 50       23 last SENT unless $sent_count;
233              
234 0 0       0 croak "lingering sent count: $sent_count" if $sent_count;
235             }
236              
237 777 50       2051 croak "lingering event count" if delete $event_count{$sid};
238 777 50       2393 croak "lingering post count" if delete $post_count{$sid};
239             }
240              
241             # TODO Alarm maintenance functions may move out to a separate
242             # POE::Resource module in the future. Why? Because alarms may
243             # eventually be managed by something other than the event queue.
244             # Especially if we incorporate a proper Session scheduler. Be sure to
245             # move the tests to a corresponding t/res/*.t file.
246              
247             ### Remove a specific alarm by its name. This is in the events
248             ### section because alarms are currently implemented as events with
249             ### future due times.
250              
251             sub _data_ev_clear_alarm_by_name {
252 3585     3585   5540 my ($self, $sid, $alarm_name) = @_;
253              
254             my $my_alarm = sub {
255 798945 100   798945   1198971 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
256 796075 100       1191858 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
257 793147 100       2689065 return 0 unless $_[0]->[EV_NAME] eq $alarm_name;
258 1380         2909 return 1;
259 3585         17353 };
260              
261 3585         11025 foreach ($kr_queue->remove_items($my_alarm)) {
262 1380         1781 $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]);
  1380         4217  
263             }
264             }
265              
266             ### Remove a specific alarm by its ID. This is in the events section
267             ### because alarms are currently implemented as events with future due
268             ### times. TODO It's possible to remove non-alarms; is that wrong?
269              
270             sub _data_ev_clear_alarm_by_id {
271 229     229   259 my ($self, $sid, $alarm_id) = @_;
272              
273             my $my_alarm = sub {
274 228     228   476 $_[0]->[EV_SESSION]->ID() eq $sid;
275 229         777 };
276              
277 229         627 my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
278 229 100       474 return unless defined $pri;
279              
280 228         207 if (TRACE_EVENTS) {
281             _warn(
282             " removed event $id ``", $event->[EV_NAME], "'' to ",
283             $self->_data_alias_loggable($sid), " at $pri"
284             );
285             }
286              
287 228         782 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
288 228   100     956 my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0);
289 228         860 return ($time, $event);
290             }
291              
292             ### Remove all the alarms for a session. Whoot!
293              
294             sub _data_ev_clear_alarm_by_session {
295 221     4   950 my ($self, $sid) = @_;
296              
297             my $my_alarm = sub {
298 30 100   30   71 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
299 26 100       57 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
300 5         13 return 1;
301 4         21 };
302              
303 4         5 my @removed;
304 4         14 foreach ($kr_queue->remove_items($my_alarm)) {
305 5         13 my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD];
306 5         16 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
307 5   100     42 my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0);
      50        
308 5         10 push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
  5         15  
309             }
310              
311 4         25 return @removed;
312             }
313              
314             ### Decrement a post refcount
315              
316             sub _data_ev_refcount_dec {
317 5251     5251   7248 my ($self, $source_session, $dest_session) = @_;
318              
319 5251         15936 my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID);
320              
321 5251         5849 if (ASSERT_DATA) {
322             _trap $dest_session unless exists $event_count{$dest_id};
323             }
324              
325 5251 100       14170 $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id};
326              
327 5251 100       16999 return if $dest_id eq $source_id;
328              
329 4113         16380 if (ASSERT_DATA) {
330             _trap $source_session unless exists $post_count{$source_id};
331             }
332              
333 299 100       783 $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id};
334             }
335              
336             ### Fetch the number of pending events sent to a session.
337              
338             sub _data_ev_get_count_to {
339 4713     4513   6305 my ($self, $sid) = @_;
340 4713   100     22266 return $event_count{$sid} || 0;
341             }
342              
343             ### Fetch the number of pending events sent from a session.
344              
345             sub _data_ev_get_count_from {
346 4511     4511   6034 my ($self, $sid) = @_;
347 4511   100     18298 return $post_count{$sid} || 0;
348             }
349              
350             ### Dispatch events that are due for "now" or earlier.
351              
352             sub _data_ev_dispatch_due {
353 2714     2714   4166 my $self = shift;
354              
355 2714         2816 if (TRACE_EVENTS) {
356 1569     2279   9737 foreach ($kr_queue->peek_items(sub { 1 })) {
357             my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]};
358             _warn(
359             " time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ",
360             "event(@event)\n"
361             );
362             }
363             }
364              
365 2714         12609 my $now = monotime();
366 3864         7119 my $next_time;
367 3864   100     10743 while (
368             defined($next_time = $kr_queue->get_next_priority()) and
369             $next_time <= $now
370             ) {
371 22397         40414 my ($priority, $id, $event) = $kr_queue->dequeue_next();
372              
373 4119         7903 if (TRACE_EVENTS) {
374             _warn(" dispatching event $id ($event->[EV_NAME])");
375             }
376              
377             # TODO - Why can't we reverse these two lines?
378             # TODO - Reversing them could avoid entering and removing GC marks.
379 4119         36840 $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]);
380              
381 2969 100       13170 if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) {
382 1409         1694 $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  1409         5400  
383             }
384             else {
385 3129         6057 $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  3129         9001  
386             }
387              
388             # Stop the system if an unhandled exception occurred.
389             # This wipes out all sessions and associated resources.
390 3398 100       15859 next unless $POE::Kernel::kr_exception;
391 1584         9616 POE::Kernel->stop();
392             }
393              
394             # Sweep for dead sessions. The sweep may alter the next queue time.
395              
396 3143         9882 $self->_data_ses_gc_sweep();
397 1761         4430 $next_time = $kr_queue->get_next_priority();
398              
399             # Tell the event loop to wait for the next event, if there is one.
400             # Otherwise we're going to wait indefinitely for some other event.
401              
402 1761 100       4378 if (defined $next_time) {
403 2465         5150 $self->loop_reset_time_watcher($next_time);
404             }
405             else {
406 1873         7704 $self->loop_pause_time_watcher();
407             }
408             }
409              
410             1;
411              
412             __END__