File Coverage

blib/lib/Net/SNMP/Dispatcher.pm
Criterion Covered Total %
statement 132 248 53.2
branch 37 122 30.3
condition 9 38 23.6
subroutine 26 40 65.0
pod 0 15 0.0
total 204 463 44.0


line stmt bran cond sub pod time code
1             # -*- mode: perl -*-
2             # ============================================================================
3              
4             package Net::SNMP::Dispatcher;
5              
6             # $Id: Dispatcher.pm,v 4.1 2010/09/10 00:01:22 dtown Rel $
7              
8             # Object that dispatches SNMP messages and handles the scheduling of events.
9              
10             # Copyright (c) 2001-2010 David M. Town
11             # All rights reserved.
12              
13             # This program is free software; you may redistribute it and/or modify it
14             # under the same terms as the Perl 5 programming language system itself.
15              
16             # ============================================================================
17              
18 1     1   6926 use strict;
  1         2  
  1         37  
19 1     1   672 use Errno;
  1         1262  
  1         44  
20              
21 1     1   511 use Net::SNMP::MessageProcessing();
  1         5  
  1         26  
22 1     1   7 use Net::SNMP::Message qw( TRUE FALSE );
  1         4  
  1         208  
23              
24             ## Version of the Net::SNMP::Dispatcher module
25              
26             our $VERSION = v4.0.1;
27              
28             ## Package variables
29              
30             our $INSTANCE; # Reference to our Singleton object
31              
32             our $DEBUG = FALSE; # Debug flag
33              
34             our $MESSAGE_PROCESSING; # Reference to the Message Processing object
35              
36             ## Event array indexes
37              
38 16     16   66 sub _ACTIVE { 0 } # State of the event
39 59     59   180 sub _TIME { 1 } # Execution time
40 12     12   32 sub _CALLBACK { 2 } # Callback reference
41 29     29   90 sub _PREVIOUS { 3 } # Previous event
42 50     50   165 sub _NEXT { 4 } # Next event
43              
44             BEGIN
45             {
46             # Use a higher resolution of time() and possibly a monotonically
47             # increasing time value if the Time::HiRes module is available.
48              
49 1 50   1   65 if (eval 'require Time::HiRes') {
50 1         2101 Time::HiRes->import('time');
51 1     1   6 no warnings;
  1         1  
  1         132  
52 1 50       242 if (eval 'Time::HiRes::clock_gettime(Time::HiRes::CLOCK_MONOTONIC())' > 0)
53             {
54             *time = sub () {
55 16     16   55 Time::HiRes::clock_gettime(Time::HiRes::CLOCK_MONOTONIC());
56 1         45 };
57             }
58             }
59              
60             # Validate the creation of the Message Processing object.
61              
62 1 50       13 if (!defined($MESSAGE_PROCESSING = Net::SNMP::MessageProcessing->instance()))
63             {
64 0         0 die 'FATAL: Failed to create Message Processing instance';
65             }
66             }
67              
68             # [public methods] -----------------------------------------------------------
69              
70             sub instance
71             {
72 1   33 1 0 662 return $INSTANCE ||= Net::SNMP::Dispatcher->_new();
73             }
74              
75             sub loop
76             {
77 1     1 0 198 my ($this) = @_;
78              
79 1 50       5 return TRUE if ($this->{_active});
80              
81 1         4 $this->{_active} = TRUE;
82              
83             # Process while there are events and file descriptor handlers.
84 1   66     5 while (defined $this->{_event_queue_h} || keys %{$this->{_descriptors}}) {
  1         7  
85 12         41 $this->_event_handle(undef);
86             }
87              
88 1         6 return $this->{_active} = FALSE;
89             }
90              
91             sub one_event
92             {
93 0     0 0 0 my ($this) = @_;
94              
95 0 0       0 return TRUE if ($this->{_active});
96              
97 0 0 0     0 if (defined $this->{_event_queue_h} || keys %{$this->{_descriptors}}) {
  0         0  
98 0         0 $this->{_active} = TRUE;
99 0         0 $this->_event_handle(0);
100 0         0 $this->{_active} = FALSE;
101             }
102              
103 0   0     0 return (defined $this->{_event_queue_h} || keys %{$this->{_descriptors}});
104             }
105              
106             sub activate
107             {
108 0     0 0 0 goto &loop;
109             }
110              
111             sub listen
112             {
113 0     0 0 0 goto &loop;
114             }
115              
116             sub send_pdu
117             {
118 0     0 0 0 my ($this, $pdu, $delay) = @_;
119              
120             # Clear any previous errors
121 0         0 $this->_error_clear();
122              
123 0 0 0     0 if ((@_ < 2) || !ref $pdu) {
124 0         0 return $this->_error('The required PDU object is missing or invalid');
125             }
126              
127             # If the Dispatcher is active and the delay value is negative,
128             # send the message immediately.
129              
130 0 0       0 if ($delay < 0) {
131 0 0       0 if ($this->{_active}) {
132 0         0 return $this->_send_pdu($pdu, $pdu->retries());
133             }
134 0         0 $delay = 0;
135             }
136              
137 0         0 $this->schedule($delay, [\&_send_pdu, $pdu, $pdu->retries()]);
138              
139 0         0 return TRUE;
140             }
141              
142             sub return_response_pdu
143             {
144 0     0 0 0 my ($this, $pdu) = @_;
145              
146 0         0 return $this->send_pdu($pdu, -1);
147             }
148              
149             sub msg_handle_alloc
150             {
151 0     0 0 0 return $MESSAGE_PROCESSING->msg_handle_alloc();
152             }
153              
154             sub schedule
155             {
156 4     4 0 775 my ($this, $time, $callback) = @_;
157              
158 4         12 return $this->_event_create($time, $this->_callback_create($callback));
159             }
160              
161             sub cancel
162             {
163 0     0 0 0 my ($this, $event) = @_;
164              
165 0         0 return $this->_event_delete($event);
166             }
167              
168             sub register
169             {
170 1     1 0 215 my ($this, $transport, $callback) = @_;
171              
172             # Transport Domain and file descriptor must be valid.
173 1         3 my $fileno;
174              
175 1 50 33     8 if (!defined($transport) || !defined($fileno = $transport->fileno())) {
176 0         0 return $this->_error('The Transport Domain object is invalid');
177             }
178              
179             # NOTE: The callback must read the data associated with the
180             # file descriptor or the Dispatcher will continuously
181             # call the callback and get stuck in an infinite loop.
182              
183 1 50       18 if (!exists $this->{_descriptors}->{$fileno}) {
184              
185             # Make sure that the "readable" vector is defined.
186 1 50       9 if (!defined $this->{_rin}) {
187 1         3 $this->{_rin} = q{};
188             }
189              
190             # Add the file descriptor to the list.
191 1         6 $this->{_descriptors}->{$fileno} = [
192             $this->_callback_create($callback), # Callback
193             $transport, # Transport Domain object
194             1 # Reference count
195             ];
196              
197             # Add the file descriptor to the "readable" vector.
198 1         5 vec($this->{_rin}, $fileno, 1) = 1;
199              
200 1         10 DEBUG_INFO('added handler for descriptor [%d]', $fileno);
201              
202             } else {
203             # Bump up the reference count.
204 0         0 $this->{_descriptors}->{$fileno}->[2]++;
205             }
206              
207 1         4 return $transport;
208             }
209              
210             sub deregister
211             {
212 1     1 0 476 my ($this, $transport) = @_;
213              
214             # Transport Domain and file descriptor must be valid.
215 1         2 my $fileno;
216              
217 1 50 33     23 if (!defined($transport) || !defined($fileno = $transport->fileno())) {
218 0         0 return $this->_error('The Transport Domain object is invalid');
219             }
220              
221 1 50       12 if (exists $this->{_descriptors}->{$fileno}) {
222              
223             # Check reference count.
224 1 50       5 if (--$this->{_descriptors}->{$fileno}->[2] < 1) {
225              
226             # Remove the file descriptor from the list.
227 1         4 delete $this->{_descriptors}->{$fileno};
228              
229             # Remove the file descriptor from the "readable" vector.
230 1         5 vec($this->{_rin}, $fileno, 1) = 0;
231              
232             # Undefine the vector if there are no file descriptors,
233             # some systems expect this to make select() work properly.
234              
235 1 50       2 if (!keys %{$this->{_descriptors}}) {
  1         5  
236 1         7 $this->{_rin} = undef;
237             }
238              
239 1         4 DEBUG_INFO('removed handler for descriptor [%d]', $fileno);
240             }
241              
242             } else {
243 0         0 return $this->_error('The Transport Domain object is not registered');
244             }
245              
246 1         4 return $transport;
247             }
248              
249             sub error
250             {
251 0   0 0 0 0 return $_[0]->{_error} || q{};
252             }
253              
254             sub debug
255             {
256 0 0   0 0 0 return (@_ == 2) ? $DEBUG = ($_[1]) ? TRUE : FALSE : $DEBUG;
    0          
257             }
258              
259             # [private methods] ----------------------------------------------------------
260              
261             sub _new
262             {
263 1     1   4 my ($class) = @_;
264              
265             # The constructor is private since we only want one
266             # Dispatcher object.
267              
268 1         7 return bless {
269             '_active' => FALSE, # State of this Dispatcher object
270             '_error' => undef, # Error message
271             '_event_queue_h' => undef, # Head of the event queue
272             '_event_queue_t' => undef, # Tail of the event queue
273             '_rin' => undef, # Readable vector for select()
274             '_descriptors' => {}, # List of file descriptors to monitor
275             }, $class;
276             }
277              
278             sub _send_pdu
279             {
280 0     0   0 my ($this, $pdu, $retries) = @_;
281              
282             # Pass the PDU to Message Processing so that it can
283             # create the new outgoing message.
284              
285 0         0 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg($pdu);
286              
287 0 0       0 if (!defined $msg) {
288             # Inform the command generator about the Message Processing error.
289 0         0 $pdu->status_information($MESSAGE_PROCESSING->error());
290 0         0 return;
291             }
292              
293             # Actually send the message.
294              
295 0 0       0 if (!defined $msg->send()) {
296              
297             # Delete the msgHandle.
298 0 0       0 if ($pdu->expect_response()) {
299 0         0 $MESSAGE_PROCESSING->msg_handle_delete($msg->msg_id());
300             }
301              
302             # A crude attempt to recover from temporary failures.
303 0 0 0     0 if (($retries-- > 0) && ($!{EAGAIN} || $!{EWOULDBLOCK})) {
      0        
304 0         0 DEBUG_INFO('attempting recovery from temporary failure');
305 0         0 $this->schedule($pdu->timeout(), [\&_send_pdu, $pdu, $retries]);
306 0         0 return FALSE;
307             }
308              
309             # Inform the command generator about the send() error.
310 0         0 $pdu->status_information($msg->error());
311              
312 0         0 return;
313             }
314              
315             # Schedule the timeout handler if the message expects a response.
316              
317 0 0       0 if ($pdu->expect_response()) {
318 0         0 $this->register($msg->transport(), [\&_transport_response_received]);
319 0         0 $msg->timeout_id(
320             $this->schedule(
321             $pdu->timeout(),
322             [\&_transport_timeout, $pdu, $retries, $msg->msg_id()]
323             )
324             );
325             }
326              
327 0         0 return TRUE;
328             }
329              
330             sub _transport_timeout
331             {
332 0     0   0 my ($this, $pdu, $retries, $handle) = @_;
333              
334             # Stop waiting for responses.
335 0         0 $this->deregister($pdu->transport());
336              
337             # Delete the msgHandle.
338 0         0 $MESSAGE_PROCESSING->msg_handle_delete($handle);
339              
340 0 0       0 if ($retries-- > 0) {
341              
342             # Resend a new message.
343 0         0 DEBUG_INFO('retries left %d', $retries);
344 0         0 return $this->_send_pdu($pdu, $retries);
345              
346             } else {
347              
348             # Inform the command generator about the timeout.
349 0         0 $pdu->status_information(
350             q{No response from remote host "%s"}, $pdu->hostname()
351             );
352 0         0 return;
353              
354             }
355             }
356              
357             sub _transport_response_received
358             {
359 0     0   0 my ($this, $transport) = @_;
360              
361             # Clear any previous errors
362 0         0 $this->_error_clear();
363              
364 0 0       0 if (!ref $transport) {
365 0         0 die 'FATAL: The Transport Domain object is invalid';
366             }
367              
368             # Create a new Message object to receive the response
369 0         0 my ($msg, $error) = Net::SNMP::Message->new(-transport => $transport);
370              
371 0 0       0 if (!defined $msg) {
372 0         0 die sprintf 'Failed to create Message object: %s', $error;
373             }
374              
375             # Read the message from the Transport Layer
376 0 0       0 if (!defined $msg->recv()) {
377 0 0       0 if (!$transport->connectionless()) {
378 0         0 $this->deregister($transport);
379             }
380 0         0 return $this->_error($msg->error());
381             }
382              
383             # For connection-oriented Transport Domains, it is possible to
384             # "recv" an empty buffer if reassembly is required.
385              
386 0 0       0 if (!$msg->length()) {
387 0         0 DEBUG_INFO('ignoring zero length message');
388 0         0 return FALSE;
389             }
390              
391             # Hand the message over to Message Processing.
392 0 0       0 if (!defined $MESSAGE_PROCESSING->prepare_data_elements($msg)) {
393 0         0 return $this->_error($MESSAGE_PROCESSING->error());
394             }
395              
396             # Set the error if applicable.
397 0 0       0 if ($MESSAGE_PROCESSING->error()) {
398 0         0 $msg->error($MESSAGE_PROCESSING->error());
399             }
400              
401             # Cancel the timeout.
402 0         0 $this->cancel($msg->timeout_id());
403              
404             # Stop waiting for responses.
405 0         0 $this->deregister($transport);
406              
407             # Notify the command generator to process the response.
408 0         0 return $msg->process_response_pdu();
409             }
410              
411             sub _event_create
412             {
413 4     4   6 my ($this, $time, $callback) = @_;
414              
415             # Create a new event anonymous array and add it to the queue.
416             # The event is initialized based on the currrent state of the
417             # Dispatcher object. If the Dispatcher is not currently running
418             # the event needs to be created such that it will get properly
419             # initialized when the Dispatcher is started.
420              
421 4 50       20 return $this->_event_insert(
422             [
423             $this->{_active}, # State of the object
424             $this->{_active} ? time() + $time : $time, # Execution time
425             $callback, # Callback reference
426             undef, # Previous event
427             undef, # Next event
428             ]
429             );
430             }
431              
432             sub _event_insert
433             {
434 8     8   14 my ($this, $event) = @_;
435              
436             # If the head of the list is not defined, we _must_ be the only
437             # entry in the list, so create a new head and tail reference.
438              
439 8 100       19 if (!defined $this->{_event_queue_h}) {
440 1         3 DEBUG_INFO('created new head and tail [%s]', $event);
441 1         4 return $this->{_event_queue_h} = $this->{_event_queue_t} = $event;
442             }
443              
444             # Estimate the midpoint of the list by calculating the average of
445             # the time associated with the head and tail of the list. Based
446             # on this value either start at the head or tail of the list to
447             # search for an insertion point for the new Event.
448              
449 7         16 my $midpoint = (($this->{_event_queue_h}->[_TIME] +
450             $this->{_event_queue_t}->[_TIME]) / 2);
451              
452              
453 7 50       15 if ($event->[_TIME] >= $midpoint) {
454              
455             # Search backwards from the tail of the list
456              
457 7         21 for (my $e = $this->{_event_queue_t}; defined $e; $e = $e->[_PREVIOUS]) {
458 7 50       10 if ($e->[_TIME] <= $event->[_TIME]) {
459 7         13 $event->[_PREVIOUS] = $e;
460 7         14 $event->[_NEXT] = $e->[_NEXT];
461 7 50       22 if ($e eq $this->{_event_queue_t}) {
462 7         10 DEBUG_INFO('modified tail [%s]', $event);
463 7         15 $this->{_event_queue_t} = $event;
464             } else {
465 0         0 DEBUG_INFO('inserted [%s] into list', $event);
466 0         0 $e->[_NEXT]->[_PREVIOUS] = $event;
467             }
468 7         13 return $e->[_NEXT] = $event;
469             }
470             }
471              
472 0         0 DEBUG_INFO('added [%s] to head of list', $event);
473 0         0 $event->[_NEXT] = $this->{_event_queue_h};
474 0         0 $this->{_event_queue_h} = $this->{_event_queue_h}->[_PREVIOUS] = $event;
475              
476             } else {
477              
478             # Search forward from the head of the list
479              
480 0         0 for (my $e = $this->{_event_queue_h}; defined $e; $e = $e->[_NEXT]) {
481 0 0       0 if ($e->[_TIME] > $event->[_TIME]) {
482 0         0 $event->[_NEXT] = $e;
483 0         0 $event->[_PREVIOUS] = $e->[_PREVIOUS];
484 0 0       0 if ($e eq $this->{_event_queue_h}) {
485 0         0 DEBUG_INFO('modified head [%s]', $event);
486 0         0 $this->{_event_queue_h} = $event;
487             } else {
488 0         0 DEBUG_INFO('inserted [%s] into list', $event);
489 0         0 $e->[_PREVIOUS]->[_NEXT] = $event;
490             }
491 0         0 return $e->[_PREVIOUS] = $event;
492             }
493             }
494              
495 0         0 DEBUG_INFO('added [%s] to tail of list', $event);
496 0         0 $event->[_PREVIOUS] = $this->{_event_queue_t};
497 0         0 $this->{_event_queue_t} = $this->{_event_queue_t}->[_NEXT] = $event;
498              
499             }
500              
501 0         0 return $event;
502             }
503              
504             sub _event_delete
505             {
506 8     8   14 my ($this, $event) = @_;
507              
508 8         15 my $info = q{};
509              
510             # Update the previous event
511 8 50       20 if (defined $event->[_PREVIOUS]) {
    50          
512 0         0 $event->[_PREVIOUS]->[_NEXT] = $event->[_NEXT];
513             } elsif ($event eq $this->{_event_queue_h}) {
514 8 100       25 if (defined ($this->{_event_queue_h} = $event->[_NEXT])) {
515 7         15 $info = sprintf ', defined new head [%s]', $event->[_NEXT];
516             } else {
517 1         5 DEBUG_INFO('deleted [%s], list is now empty', $event);
518 1         2 $this->{_event_queue_t} = undef @{$event};
  1         5  
519 1         109 return FALSE; # Indicate queue is empty
520             }
521             } else {
522 0         0 die 'FATAL: Attempted to delete Event object with an invalid head';
523             }
524              
525             # Update the next event
526 7 50       17 if (defined $event->[_NEXT]) {
    0          
527 7         13 $event->[_NEXT]->[_PREVIOUS] = $event->[_PREVIOUS];
528             } elsif ($event eq $this->{_event_queue_t}) {
529 0         0 $info .= sprintf ', defined new tail [%s]', $event->[_PREVIOUS];
530 0         0 $this->{_event_queue_t} = $event->[_PREVIOUS];
531             } else {
532 0         0 die 'FATAL: Attempted to delete Event object with an invalid tail';
533             }
534              
535 7         21 DEBUG_INFO('deleted [%s]%s', $event, $info);
536 7         6 undef @{$event};
  7         20  
537              
538             # Indicate queue still has entries
539 7         22 return TRUE;
540             }
541              
542             sub _event_init
543             {
544 4     4   6 my ($this, $event) = @_;
545              
546 4         7 DEBUG_INFO('initializing event [%s]', $event);
547              
548             # Save the time and callback because they will be cleared.
549 4         8 my ($time, $callback) = @{$event}[_TIME, _CALLBACK];
  4         8  
550              
551             # Remove the event from the queue.
552 4         10 $this->_event_delete($event);
553              
554             # Update the appropriate fields.
555 4         9 $event->[_ACTIVE] = $this->{_active};
556 4 50       17 $event->[_TIME] = $this->{_active} ? time() + $time : $time;
557 4         8 $event->[_CALLBACK] = $callback;
558              
559             # Insert the event back into the queue.
560 4         9 $this->_event_insert($event);
561              
562 4         10 return TRUE;
563             }
564              
565             sub _event_handle
566             {
567 12     12   20 my ($this, $timeout) = @_;
568              
569 12         29 my $time = time();
570              
571 12 50       4165 if (defined (my $event = $this->{_event_queue_h})) {
572              
573             # If the event was inserted with a non-zero delay while the
574             # Dispatcher was not active, the scheduled time of the event
575             # needs to be updated.
576              
577 12 100 66     36 if (!$event->[_ACTIVE] && $event->[_TIME]) {
578 4         12 return $this->_event_init($event);
579             }
580              
581 8 100       76 if ($event->[_TIME] <= $time) {
    50          
582              
583             # If the scheduled time of the event is past, execute it and
584             # set the timeout to zero to poll the descriptors immediately.
585              
586 4         14 $this->_callback_execute($event->[_CALLBACK]);
587 4         20 $this->_event_delete($event);
588 4         6 $timeout = 0;
589              
590             } elsif (!defined $timeout) {
591              
592             # Calculate the timeout for the next event unless one was
593             # specified by the caller.
594              
595 4         12 $timeout = $event->[_TIME] - $time;
596 4         13 DEBUG_INFO('event [%s], timeout = %.04f', $event, $timeout);
597              
598             }
599              
600             }
601              
602             # Check the file descriptors for activity.
603              
604 8         3992587 my $nfound = select(my $rout = $this->{_rin}, undef, undef, $timeout);
605              
606 8 50 33     96 if (!defined $nfound || $nfound < 0) {
    100          
607              
608 0 0       0 if ($!{EINTR}) { # Recoverable error
609 0         0 return FALSE;
610             } else {
611 0         0 die sprintf 'FATAL: select() error: %s', $!;
612             }
613              
614             } elsif ($nfound > 0) {
615              
616             # Find out which file descriptors have data ready for reading.
617              
618 1 50       4 if (defined $rout) {
619 1         3 for (keys %{$this->{_descriptors}}) {
  1         6  
620 1 50       5 if (vec $rout, $_, 1) {
621 1         3 DEBUG_INFO('descriptor [%d] ready for read', $_);
622 1         2 $this->_callback_execute(@{$this->{_descriptors}->{$_}}[0,1]);
  1         5  
623             }
624             }
625             }
626              
627             }
628              
629 8         48 return TRUE;
630             }
631              
632             sub _callback_create
633             {
634 5     5   7 my ($this, $callback) = @_;
635              
636             # Callbacks can be passed in two different ways. If the callback
637             # has options, the callback must be passed as an ARRAY reference
638             # with the first element being a CODE reference and the remaining
639             # elements the arguments. If the callback has no options it is
640             # just passed as a CODE reference.
641              
642 5 50 33     31 if ((ref($callback) eq 'ARRAY') && (ref($callback->[0]) eq 'CODE')) {
    0          
643 5         20 return $callback;
644             } elsif (ref($callback) eq 'CODE') {
645 0         0 return [$callback];
646             } else {
647 0         0 return [];
648             }
649             }
650              
651             sub _callback_execute
652             {
653 5     5   14 my ($this, @argv) = @_;
654              
655             # The callback is invoked passing a reference to this object
656             # with the parameters passed by the user next and then any
657             # parameters that the caller provides.
658              
659 5         11 my ($callback, @user_argv) = @{shift @argv};
  5         17  
660              
661             # Protect ourselves from user error.
662 5         17 eval { $callback->($this, @user_argv, @argv); };
  5         29  
663              
664 5 50       1917 return ($@) ? $this->_error($@) : TRUE;
665             }
666              
667             sub _error
668             {
669 0     0   0 my $this = shift;
670              
671 0 0       0 if (!defined $this->{_error}) {
672 0 0       0 $this->{_error} = (@_ > 1) ? sprintf(shift(@_), @_) : $_[0];
673 0 0       0 if ($this->debug()) {
674 0         0 printf "error: [%d] %s(): %s\n",
675             (caller 0)[2], (caller 1)[3], $this->{_error};
676             }
677             }
678              
679 0         0 return;
680             }
681              
682             sub _error_clear
683             {
684 0     0   0 return $_[0]->{_error} = undef;
685             }
686              
687             sub DEBUG_INFO
688             {
689 27 50   27 0 75 return $DEBUG if (!$DEBUG);
690              
691 0 0         return printf
692             sprintf('debug: [%d] %s(): ', (caller 0)[2], (caller 1)[3]) .
693             ((@_ > 1) ? shift(@_) : '%s') .
694             "\n",
695             @_;
696             }
697              
698             # ============================================================================
699             1; # [end Net::SNMP::Dispatcher]