File Coverage

blib/lib/POE/Resource/Events.pm
Criterion Covered Total %
statement 136 150 90.6
branch 57 72 79.1
condition 22 31 70.9
subroutine 21 22 95.4
pod n/a
total 236 275 85.8


line stmt bran cond sub pod time code
1             # Data and accessors to manage POE's events.
2              
3             package POE::Resource::Events;
4              
5 175     175   851 use vars qw($VERSION);
  175         296  
  175         10806  
6             $VERSION = '1.367'; # NOTE - Should be #.### (three decimal places)
7              
8             # These methods are folded into POE::Kernel;
9             package POE::Kernel;
10              
11 175     175   869 use strict;
  175         281  
  175         424105  
12              
13             # A local copy of the queue so we can manipulate it directly.
14             my $kr_queue;
15              
16             my %event_count;
17             # ( $session_id => $count,
18             # ...,
19             # );
20              
21             my %post_count;
22             # ( $session_id => $count,
23             # ...,
24             # );
25              
26             ### Begin-run initialization.
27              
28             sub _data_ev_initialize {
29 175     175   356 my ($self, $queue) = @_;
30 175         828 $kr_queue = $queue;
31             }
32              
33             ### End-run leak checking.
34              
35             sub _data_ev_relocate_kernel_id {
36 4     4   8 my ($self, $old_id, $new_id) = @_;
37              
38 4 100       60 $event_count{$new_id} = delete $event_count{$old_id}
39             if exists $event_count{$old_id};
40 4 50       23 $post_count{$new_id} = delete $post_count{$old_id}
41             if exists $post_count{$old_id};
42             }
43              
44             sub _data_ev_finalize {
45 191     191   1258 my $finalized_ok = 1;
46 191         960 while (my ($ses_id, $cnt) = each(%event_count)) {
47 0         0 $finalized_ok = 0;
48 0         0 _warn("!!! Leaked event-to count: $ses_id = $cnt\n");
49             }
50              
51 191         1313 while (my ($ses_id, $cnt) = each(%post_count)) {
52 0         0 $finalized_ok = 0;
53 0         0 _warn("!!! Leaked event-from count: $ses_id = $cnt\n");
54             }
55 191         424 return $finalized_ok;
56             }
57              
58             ### Enqueue an event.
59              
60             sub FIFO_TIME_EPSILON () { 0.000001 }
61             my $last_fifo_time = monotime();
62              
63             sub _data_ev_enqueue {
64             my (
65 5458     5458   18671 $self,
66             $session, $source_session, $event, $type, $etc,
67             $file, $line, $fromstate, $time, $delta, $priority
68             ) = @_;
69              
70 5458         15641 my $sid = $session->ID;
71              
72 5457         7466 if (ASSERT_DATA) {
73             unless ($self->_data_ses_exists($sid)) {
74             _trap(
75             " can't enqueue event ``$event'' for nonexistent",
76             $self->_data_alias_loggable($sid)
77             );
78             }
79             }
80              
81             # This is awkward, but faster than using the fields individually.
82 5457         18875 my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ];
83 1284 100       2989 if( defined $time ) {
84 4181         15382 $event_to_enqueue->[EV_WALLTIME] = $time;
85 4181         8681 $event_to_enqueue->[EV_DELTA] = $delta;
86 1515   100     3301 $priority ||= wall2mono( $time + ($delta||0) );
      100        
87             }
88             else {
89 2783   66     7489 $priority ||= monotime();
90             }
91              
92 2791         13302 my $new_id;
93 3950         20708 my $old_head_priority = $kr_queue->get_next_priority();
94              
95 5457 100       8203 unless ($type & ET_MASK_DELAYED) {
96 4606 100       17560 $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time;
97 4606         11174 $last_fifo_time = $priority;
98             }
99              
100 3488         12041 $new_id = $kr_queue->enqueue($priority, $event_to_enqueue);
101 3488         6285 $event_to_enqueue->[EV_SEQ] = $new_id;
102              
103             #_carp( Carp::longmess( " priority is much to far in the future" ) ) if $priority > 1354569908;
104 5457         15166 if (TRACE_EVENTS ) {
105             _warn(
106             " enqueued event $new_id ``$event'' from ",
107             $self->_data_alias_loggable($source_session->ID), " to ",
108             $self->_data_alias_loggable($sid),
109             " at $time, priority=$priority"
110             );
111             }
112              
113 5457 100       14883 unless (defined $old_head_priority) {
    100          
114 4374         5658 $self->loop_resume_time_watcher($priority);
115             }
116             elsif ($priority < $old_head_priority) {
117 4761         18353 $self->loop_reset_time_watcher($priority);
118             }
119              
120             # This is the counterpart to _data_ev_refcount_dec(). It's only
121             # used in one place, so it's not in its own function.
122              
123 5096 100       30396 $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++;
124              
125 1942 100       7698 return $new_id if $sid eq $source_session->ID();
126              
127 1075 100       9510 $self->_data_ses_refcount_inc($source_session->ID) unless (
128             $post_count{$source_session->ID}++
129             );
130              
131 4272         18380 return $new_id;
132             }
133              
134             sub _data_ev_set
135             {
136 3363     0   343239 my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_;
137              
138 193         38495 my $event = (
139 215         722 grep { $_->[1] == $alarm_id }
140             $kr_queue->peek_items( $my_alarm )
141             )[0];
142              
143 0 100       0 return unless $event;
144              
145 0         0 my $payload = $event->[ITEM_PAYLOAD];
146              
147             # XXX - However, if there has been a clock skew, the priority will
148             # have changed and we should recalculate priority from time+delta
149              
150 0 0 0     0 $delta = $payload->[EV_DELTA] || 0 unless defined $delta;
151 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta );
152 0         0 $payload->[EV_WALLTIME] = $time;
153 0         0 $payload->[EV_DELTA] = $delta;
154              
155 0   0     0 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      0        
156             }
157              
158             sub _data_ev_adjust
159             {
160 30     30   60 my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_;
161              
162             # XXX - However, if there has been a clock skew, the priority will
163             # have changed and we should recalculate priority from time+delta
164 30 50       70 if( $time ) {
165             # PG - We are never invoked with $time anyway.
166 0         0 $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta );
167             }
168             else {
169 30         115 $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta );
170             }
171              
172 3470         4091 my $event = (
173 30         99 grep { $_->[1] == $alarm_id }
174             $kr_queue->peek_items( $my_alarm )
175             )[0];
176              
177 30 50       138 return unless $event;
178              
179 30         44 my $payload = $event->[ITEM_PAYLOAD];
180              
181 30 50       66 $payload->[EV_WALLTIME] = $time if $time;
182 30 50       84 $payload->[EV_DELTA] += $delta if $delta;
183              
184 30   50     311 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
      100        
185             }
186              
187             ### Remove events sent to or from a specific session.
188              
189             sub _data_ev_clear_session {
190 792     792   2176 my ($self, $sid) = @_;
191              
192             # Events sent to the session.
193 792         1524 PENDING: {
194 792         1062 my $pending_count = $event_count{$sid};
195 792 100       2412 last PENDING unless $pending_count;
196              
197 222         2957 foreach (
198             $kr_queue->remove_items(
199 244     244   1083 sub { $_[0][EV_SESSION]->ID() eq $sid },
200             $pending_count
201             )
202             ) {
203 227         1210 $self->_data_ev_refcount_dec(
204 227         463 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
205             );
206 227         755 $pending_count--;
207             }
208              
209             # TODO - fork() can make this go negative on some systems.
210 222 50       2310 last PENDING unless $pending_count;
211              
212 0 0       0 croak "lingering pending count: $pending_count" if $pending_count;
213             }
214              
215             # Events sent by the session.
216             SENT: {
217 792         1103 my $sent_count = $post_count{$sid};
  792         1465  
218 792 100       2965 last SENT unless $sent_count;
219              
220 3         55 foreach (
221             $kr_queue->remove_items(
222 3     3   15 sub { $_[0][EV_SOURCE]->ID() eq $sid },
223             $sent_count
224             )
225             ) {
226 3         14 $self->_data_ev_refcount_dec(
227 3         6 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
228             );
229 3         10 $sent_count--;
230             }
231              
232 3 50       29 last SENT unless $sent_count;
233              
234 0 0       0 croak "lingering sent count: $sent_count" if $sent_count;
235             }
236              
237 792 50       2454 croak "lingering event count" if delete $event_count{$sid};
238 792 50       4395 croak "lingering post count" if delete $post_count{$sid};
239             }
240              
241             # TODO Alarm maintenance functions may move out to a separate
242             # POE::Resource module in the future. Why? Because alarms may
243             # eventually be managed by something other than the event queue.
244             # Especially if we incorporate a proper Session scheduler. Be sure to
245             # move the tests to a corresponding t/res/*.t file.
246              
247             ### Remove a specific alarm by its name. This is in the events
248             ### section because alarms are currently implemented as events with
249             ### future due times.
250              
251             sub _data_ev_clear_alarm_by_name {
252 3604     3604   7092 my ($self, $sid, $alarm_name) = @_;
253              
254             my $my_alarm = sub {
255 798978 100   798978   1338715 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
256 796103 100       1225359 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
257 793166 100       2804716 return 0 unless $_[0]->[EV_NAME] eq $alarm_name;
258 1382         3632 return 1;
259 3604         24064 };
260              
261 3604         13927 foreach ($kr_queue->remove_items($my_alarm)) {
262 1382         2618 $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]);
  1382         7527  
263             }
264             }
265              
266             ### Remove a specific alarm by its ID. This is in the events section
267             ### because alarms are currently implemented as events with future due
268             ### times. TODO It's possible to remove non-alarms; is that wrong?
269              
270             sub _data_ev_clear_alarm_by_id {
271 229     229   348 my ($self, $sid, $alarm_id) = @_;
272              
273             my $my_alarm = sub {
274 228     228   642 $_[0]->[EV_SESSION]->ID() eq $sid;
275 229         939 };
276              
277 229         694 my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
278 229 100       652 return unless defined $pri;
279              
280 228         271 if (TRACE_EVENTS) {
281             _warn(
282             " removed event $id ``", $event->[EV_NAME], "'' to ",
283             $self->_data_alias_loggable($sid), " at $pri"
284             );
285             }
286              
287 228         1117 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
288 228   100     1169 my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0);
289 228         1351 return ($time, $event);
290             }
291              
292             ### Remove all the alarms for a session. Whoot!
293              
294             sub _data_ev_clear_alarm_by_session {
295 221     4   1334 my ($self, $sid) = @_;
296              
297             my $my_alarm = sub {
298 30 100   30   68 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
299 26 100       53 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
300 5         17 return 1;
301 4         26 };
302              
303 4         8 my @removed;
304 4         18 foreach ($kr_queue->remove_items($my_alarm)) {
305 5         14 my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD];
306 5         18 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
307 5   100     51 my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0);
      50        
308 5         10 push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
  5         23  
309             }
310              
311 4         36 return @removed;
312             }
313              
314             ### Decrement a post refcount
315              
316             sub _data_ev_refcount_dec {
317 5325     5325   10097 my ($self, $source_session, $dest_session) = @_;
318              
319 5325         17441 my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID);
320              
321 5325         6830 if (ASSERT_DATA) {
322             _trap $dest_session unless exists $event_count{$dest_id};
323             }
324              
325 5325 100       19382 $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id};
326              
327 5325 100       21781 return if $dest_id eq $source_id;
328              
329 4169         22679 if (ASSERT_DATA) {
330             _trap $source_session unless exists $post_count{$source_id};
331             }
332              
333 314 100       888 $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id};
334             }
335              
336             ### Fetch the number of pending events sent to a session.
337              
338             sub _data_ev_get_count_to {
339 4903     4688   7782 my ($self, $sid) = @_;
340 4903   100     28495 return $event_count{$sid} || 0;
341             }
342              
343             ### Fetch the number of pending events sent from a session.
344              
345             sub _data_ev_get_count_from {
346 4686     4686   7571 my ($self, $sid) = @_;
347 4686   100     23616 return $post_count{$sid} || 0;
348             }
349              
350             ### Dispatch events that are due for "now" or earlier.
351              
352             sub _data_ev_dispatch_due {
353 2771     2771   5298 my $self = shift;
354              
355 2771         3500 if (TRACE_EVENTS) {
356 1623     2329   12484 foreach ($kr_queue->peek_items(sub { 1 })) {
357             my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]};
358             _warn(
359             " time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ",
360             "event(@event)\n"
361             );
362             }
363             }
364              
365 2771         16741 my $now = monotime();
366 3913         7719 my $next_time;
367 3913   100     11228 while (
368             defined($next_time = $kr_queue->get_next_priority()) and
369             $next_time <= $now
370             ) {
371 22864         44685 my ($priority, $id, $event) = $kr_queue->dequeue_next();
372              
373 4186         8631 if (TRACE_EVENTS) {
374             _warn(" dispatching event $id ($event->[EV_NAME])");
375             }
376              
377             # TODO - Why can't we reverse these two lines?
378             # TODO - Reversing them could avoid entering and removing GC marks.
379 4186         43643 $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]);
380              
381 3044 100       15028 if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) {
382 1467         2161 $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  1467         6080  
383             }
384             else {
385 3200         7831 $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
  3200         10396  
386             }
387              
388             # Stop the system if an unhandled exception occurred.
389             # This wipes out all sessions and associated resources.
390 3469 100       18461 next unless $POE::Kernel::kr_exception;
391 1638         11681 POE::Kernel->stop();
392             }
393              
394             # Sweep for dead sessions. The sweep may alter the next queue time.
395              
396 3196         11533 $self->_data_ses_gc_sweep();
397 1775         4649 $next_time = $kr_queue->get_next_priority();
398              
399             # Tell the event loop to wait for the next event, if there is one.
400             # Otherwise we're going to wait indefinitely for some other event.
401              
402 1775 100       12981 if (defined $next_time) {
403 2511         6058 $self->loop_reset_time_watcher($next_time);
404             }
405             else {
406 1904         9691 $self->loop_pause_time_watcher();
407             }
408             }
409              
410             1;
411              
412             __END__